diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d0425907f6a..02e6cd0c807 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -360,17 +362,8 @@ protected void recoverApplication(ApplicationStateData appState, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery, long startTime) throws YarnException { - if (!isRecovery) { - // Do queue mapping - if (rmContext.getQueuePlacementManager() != null) { - // We only do queue mapping when it's a new application - rmContext.getQueuePlacementManager().placeApplication( - submissionContext, user); - } - // fail the submission if configured application timeout value is invalid - RMServerUtils.validateApplicationTimeouts( - submissionContext.getApplicationTimeouts()); - } + + ApplicationPlacementContext placementContext = placeApplication(submissionContext, user, isRecovery); ApplicationId applicationId = submissionContext.getApplicationId(); List amReqs = validateAndCreateResourceRequest( @@ -418,7 +411,7 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReqs, startTime); + submissionContext.getApplicationTags(), amReqs, placementContext, startTime); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -763,4 +756,31 @@ private void updateAppDataToStateStore(String queue, RMApp app, + "' with below exception:" + ex.getMessage()); } } + + private ApplicationPlacementContext placeApplication(ApplicationSubmissionContext submissionContext, String user, boolean isRecovery) throws YarnException { + ApplicationPlacementContext placementContext = null; + if (!isRecovery) { + // Do queue mapping + if (rmContext.getQueuePlacementManager() != null) { + // We only do queue mapping when it's a new application + placementContext = rmContext.getQueuePlacementManager().placeApplication( + submissionContext, user); + // Set it to ApplicationSubmissionContext + if (placementContext != null) { + LOG.info("Placed application=" + submissionContext.getApplicationId() + " to queue=" + + placementContext.getQueue() + ", original queue=" + submissionContext.getQueue()); + submissionContext.setQueue(placementContext.getQueue()); + } + } + // fail the submission if configured application timeout value is invalid + RMServerUtils.validateApplicationTimeouts( + submissionContext.getApplicationTimeouts()); + } else { + if (rmContext.getQueuePlacementManager() != null) { + placementContext = rmContext.getQueuePlacementManager().placeApplication( + submissionContext, user); + } + } + return placementContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java new file mode 100644 index 00000000000..ba34aa0c94a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +/** + * Each placement rule when it successfully places an appliation onto a queue + * returns a PlacementRuleContext which encapsulates the placement rule that was applied, + * the queue the application was mapped to and any parent queue for the queue it was mapped to + * if any parent queue exists in queue mapping + */ +public class ApplicationPlacementContext { + + private PlacementRule rule; + + private String queue; + + private String parentQueue; + + public ApplicationPlacementContext(String queue) { + this(queue, null); + this.parentQueue = parentQueue; + } + + public ApplicationPlacementContext(String queue, PlacementRule rule, + String parentQueue) { + this(queue, rule); + this.parentQueue = parentQueue; + } + + public ApplicationPlacementContext(String queue, PlacementRule rule) { + this.rule = rule; + this.queue = queue; + } + + public String getQueue() { + return queue; + } + + public String getParentQueue() { + return parentQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } + + public PlacementRule getRule() { + return rule; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 43a4deb70eb..756ce18b586 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,36 +52,32 @@ public void updateRules(List rules) { } } - public void placeApplication(ApplicationSubmissionContext asc, String user) - throws YarnException { + public ApplicationPlacementContext placeApplication( + ApplicationSubmissionContext asc, String user) throws YarnException { + try { readLock.lock(); if (null == rules || rules.isEmpty()) { - return; + return null; } - - String newQueueName = null; + + ApplicationPlacementContext placement = null; for (PlacementRule rule : rules) { - newQueueName = rule.getQueueForApp(asc, user); - if (newQueueName != null) { + placement = rule.getPlacementForApp(asc, user); + if (placement != null) { break; } } - + // Failed to get where to place application - if (null == newQueueName && null == asc.getQueue()) { - String msg = "Failed to get where to place application=" - + asc.getApplicationId(); + if (null == placement && null == asc.getQueue()) { + String msg = "Failed to get where to place application=" + asc + .getApplicationId(); LOG.error(msg); throw new YarnException(msg); } - - // Set it to ApplicationSubmissionContext - if (!StringUtils.equals(asc.getQueue(), newQueueName)) { - LOG.info("Placed application=" + asc.getApplicationId() + " to queue=" - + newQueueName + ", original queue=" + asc.getQueue()); - asc.setQueue(newQueueName); - } + + return placement; } finally { readLock.unlock(); } @@ -92,4 +87,6 @@ public void placeApplication(ApplicationSubmissionContext asc, String user) public List getPlacementRules() { return rules; } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java index 47dc48a51c4..5e31b6277bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; public abstract class PlacementRule { + public String getName() { return this.getClass().getName(); } @@ -50,6 +51,6 @@ public void initialize(Map parameters, RMContext rmContext) * in the {@link PlacementManager} will take care *

*/ - public abstract String getQueueForApp(ApplicationSubmissionContext asc, + public abstract ApplicationPlacementContext getPlacementForApp(ApplicationSubmissionContext asc, String user) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index d617d161859..a19593470a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; public class UserGroupMappingPlacementRule extends PlacementRule { private static final Log LOG = LogFactory @@ -66,17 +77,40 @@ public String toString() { MappingType type; String source; String queue; + String parentQueue; + + public final String DELIMITER = ":"; public QueueMapping(MappingType type, String source, String queue) { this.type = type; this.source = source; this.queue = queue; + this.parentQueue = null; } - + + public QueueMapping(MappingType type, String source, String queue, String parentQueue) { + this.type = type; + this.source = source; + this.queue = queue; + this.parentQueue = parentQueue; + } + public String getQueue() { return queue; } - + + public String getParentQueue() { + return parentQueue; + } + + public MappingType getType() { + return type; + } + + public String getSource() { + return source; + } + @Override public int hashCode() { return super.hashCode(); @@ -93,6 +127,13 @@ public boolean equals(Object obj) { return false; } } + + public String toString() { + return type.toString() + DELIMITER + source + DELIMITER + + (parentQueue != null ? + parentQueue + "." + queue : + queue); + } } public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, @@ -102,26 +143,26 @@ public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, this.groups = groups; } - private String getMappedQueue(String user) throws IOException { + private ApplicationPlacementContext getPlacementForUser(String user) throws IOException { for (QueueMapping mapping : mappings) { if (mapping.type == MappingType.USER) { if (mapping.source.equals(CURRENT_USER_MAPPING)) { if (mapping.queue.equals(CURRENT_USER_MAPPING)) { - return user; + return getPlacementContext(mapping, user); } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - return groups.getGroups(user).get(0); + return getPlacementContext(mapping, groups.getGroups(user).get(0)); } else { - return mapping.queue; + return getPlacementContext(mapping); } } if (user.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } if (mapping.type == MappingType.GROUP) { for (String userGroups : groups.getGroups(user)) { if (userGroups.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } } @@ -130,13 +171,13 @@ private String getMappedQueue(String user) throws IOException { } @Override - public String getQueueForApp(ApplicationSubmissionContext asc, String user) + public ApplicationPlacementContext getPlacementForApp(ApplicationSubmissionContext asc, String user) throws YarnException { String queueName = asc.getQueue(); ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - String mappedQueue = getMappedQueue(user); + ApplicationPlacementContext mappedQueue = getPlacementForUser(user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -153,8 +194,202 @@ public String getQueueForApp(ApplicationSubmissionContext asc, String user) throw new YarnException(message); } } - - return queueName; + return null; + } + + private ApplicationPlacementContext getPlacementContext(QueueMapping mapping) { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, String leafQueueName) { + if (!StringUtils.isEmpty(mapping.parentQueue)) { + return new ApplicationPlacementContext(leafQueueName, this, mapping.getParentQueue()); + } else { + return new ApplicationPlacementContext(leafQueueName, this); + } + } + + @VisibleForTesting + public static UserGroupMappingPlacementRule get(CapacitySchedulerContext schedulerContext) + throws IOException { + CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized queue mappings, override: " + overrideWithQueueMappings); + + List queueMappings = conf.getQueueMappings(); + + // Get new user/group mappings + List newMappings = new ArrayList<>(); + + CapacitySchedulerQueueManager queueManager = + schedulerContext.getCapacitySchedulerQueueManager(); + + // check if mappings refer to valid queues + for (QueueMapping mapping : queueMappings) { + + QueuePath queuePath = extractQueuePath(mapping.getQueue()); + if (isStaticQueueMapping(mapping)) { + //Try getting queue by its leaf queue name without splitting into parent/leaf queues + CSQueue queue = queueManager.getQueue(mapping.getQueue()); + if (ifQueueDoesNotExist(queue)) { + //Try getting the queue by extracting leaf and parent queue names + //Assuming its a potential auto created leaf queue + queue = queueManager.getQueue(queuePath.getLeafQueue()); + + if (ifQueueDoesNotExist(queue)) { + //if leaf queue does not exist, this could be a potential auto created leaf queue + //validate if parent queue is specified, then it should exist and be an instance of AutoCreateEnabledParentQueue + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(queueManager, + mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.getQueue()); + } + newMappings.add(newMapping); + } else { + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + // if queue exists, validate + // if its an instance of leaf queue + // if its an instance of auto created leaf queue, then extract parent queue name and update queue mapping + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + //If it is a dynamic queue mapping, + // we can safely assume leaf queue name does not have '.' in it + // validate + // if parent queue is specified, then + // parent queue exists and an instance of AutoCreateEnabledParentQueue + // + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(queueManager, + mapping, queuePath); + if (newMapping != null) { + newMappings.add(newMapping); + } else{ + newMappings.add(mapping); + } + } + } + + // initialize groups if mappings are present + if (newMappings.size() > 0) { + Groups groups = new Groups(conf); + return new UserGroupMappingPlacementRule(overrideWithQueueMappings, + newMappings, groups); + } + + return null; + } + + private static QueueMapping validateAndGetQueueMapping(CapacitySchedulerQueueManager queueManager, CSQueue queue, QueueMapping mapping, QueuePath queuePath) + throws IOException { + if (!(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue : " + mapping.getQueue()); + } + + if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(queueManager, + mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.getQueue()); + } + return newMapping; + } + return mapping; + } + + private static boolean ifQueueDoesNotExist(CSQueue queue) { + return queue == null; + } + + private static QueueMapping validateAndGetAutoCreatedQueueMapping(CapacitySchedulerQueueManager queueManager, QueueMapping mapping, QueuePath queuePath) + throws IOException { + if (queuePath.hasParentQueue()) { + //if parent queue is specified, then it should exist and be an instance of ManagedParentQueue + validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()), + queuePath.getParentQueue(), queuePath.getLeafQueue()); + return new QueueMapping(mapping.getType(), mapping.getSource(), queuePath.getLeafQueue(), queuePath.getParentQueue()); + } + + return null; + } + + private static boolean isStaticQueueMapping(QueueMapping mapping) { + return !mapping.getQueue().contains( + UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping.getQueue() + .contains(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING); + } + + private static class QueuePath { + + public String parentQueue; + public String leafQueue; + + public QueuePath(final String leafQueue) { + this.leafQueue = leafQueue; + } + + public QueuePath(final String parentQueue, final String leafQueue) { + this.parentQueue = parentQueue; + this.leafQueue = leafQueue; + } + + public String getParentQueue() { + return parentQueue; + } + + public String getLeafQueue() { + return leafQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } + + @Override + public String toString() { + return parentQueue + DOT + leafQueue; + } + } + + private static QueuePath extractQueuePath(String queueName) throws IOException { + int parentQueueNameEndIndex = queueName.lastIndexOf(DOT); + + if ( parentQueueNameEndIndex > -1) { + final String parentQueue = queueName.substring(0, parentQueueNameEndIndex).trim(); + final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1).trim(); + return new QueuePath(parentQueue, leafQueue); + } + + return new QueuePath(queueName); + } + + + private static void validateParentQueue(CSQueue parentQueue, String parentQueueName, String leafQueueName) throws IOException { + if (parentQueue == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue [" + parentQueueName + "]"); + } else if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IOException("mapping contains leaf queue [" + leafQueueName + + "] and invalid parent queue which does not have auto creation of leaf queues enabled [" + + parentQueueName + "]"); + } else if (parentQueue instanceof ManagedParentQueue + && !parentQueue.getQueueName().equals(parentQueueName)) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue which does not match existing leaf queue's parent : [" + + parentQueueName + "] does not match [ " + parentQueue + .getQueueName() + "]"); + } } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 551f075edb5..2e166890402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -28,10 +28,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -92,8 +92,8 @@ protected void addReservationQueue( String planQueueName, Queue queue, String currResId) { PlanQueue planQueue = (PlanQueue)queue; try { - ReservationQueue resQueue = - new ReservationQueue(cs, currResId, planQueue); + AutoCreatedLeafQueue resQueue = + new AutoCreatedLeafQueue(cs, currResId, planQueue); cs.addQueue(resQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( @@ -112,8 +112,8 @@ protected void createDefaultReservationQueue( PlanQueue planQueue = (PlanQueue)queue; if (cs.getQueue(defReservationId) == null) { try { - ReservationQueue defQueue = - new ReservationQueue(cs, defReservationId, planQueue); + AutoCreatedLeafQueue defQueue = + new AutoCreatedLeafQueue(cs, defReservationId, planQueue); cs.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 93c41b6747c..76bc1da55fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -301,4 +302,9 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return True/False to confirm whether app is in final states */ boolean isAppInCompletedStates(); + + /** + * Get application's queue placement context + */ + ApplicationPlacementContext getApplicationQueuePlacementContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cfb8a74f59a..200a2e5bec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -150,6 +151,7 @@ private final Set applicationTags; private final long attemptFailuresValidityInterval; + private final ApplicationPlacementContext placementContext; private boolean amBlacklistingEnabled = false; private float blacklistDisableThreshold; @@ -157,6 +159,8 @@ private boolean isNumAttemptsBeyondThreshold = false; + + // Mutable fields private long startTime; private long finishTime = 0; @@ -412,7 +416,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, List amReqs) { this(applicationId, rmContext, config, name, user, queue, submissionContext, scheduler, masterService, submitTime, applicationType, applicationTags, - amReqs, -1); + amReqs, null, -1); } public RMAppImpl(ApplicationId applicationId, RMContext rmContext, @@ -420,7 +424,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, String applicationType, Set applicationTags, - List amReqs, long startTime) { + List amReqs, ApplicationPlacementContext placementContext, long startTime) { this.systemClock = SystemClock.getInstance(); @@ -520,6 +524,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; } } + + this.placementContext = placementContext; } /** @@ -2004,6 +2010,11 @@ public void setApplicationPriority(Priority applicationPriority) { this.applicationPriority = applicationPriority; } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + return this.placementContext; + } + /** * Clear Unused fields to free memory. * @param app diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6b9a7..d1f6402da61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -78,7 +78,7 @@ final String queueName; private final String queuePath; volatile int numContainers; - + final Resource minimumAllocation; volatile Resource maximumAllocation; private volatile QueueState state = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java new file mode 100644 index 00000000000..b3d1b4738d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * A container class for automatically created child leaf queues. + * From the user perspective this is equivalent to a LeafQueue, + * but functionality wise is a sub-class of ParentQueue + */ +public abstract class AbstractManagedParentQueue extends ParentQueue { + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractManagedParentQueue.class); + + private int maxAppsForAutoCreatedQueues; + private int maxAppsPerUserForAutoCreatedQueues; + private int userLimit; + private float userLimitFactor; + + public AbstractManagedParentQueue(CapacitySchedulerContext cs, + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); + + super.setupQueueConfigs(csContext.getClusterResource()); + initializeLeafQueueConfigs(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Managed Parent Queue: ").append(queueName) + .append("\nof type : [" + getClass()) + .append("]\nwith capacity: [") + .append(super.getCapacity()).append("]\nwith max capacity: [") + .append(super.getMaximumCapacity()).append("\nwith max apps: [") + .append(getMaxApplicationsForAutoCreatedQueues()) + .append("]\nwith max apps per user: [") + .append(getMaxApplicationsPerUserForAutoCreatedQueues()) + .append("]\nwith user limit: [").append(getUserLimit()) + .append("]\nwith user limit factor: [") + .append(getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + try { + writeLock.lock(); + + // Set new configs + setupQueueConfigs(clusterResource); + + initializeLeafQueueConfigs(); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Initialize leaf queue configs from template configurations specified on + * parent queue. + */ + protected void initializeLeafQueueConfigs() { + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + + final String queuePath = super.getQueuePath(); + int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); + if (maxApps < 0) { + maxApps = (int) ( + CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS + * getAbsoluteCapacity()); + } + userLimit = conf.getUserLimit(queuePath); + userLimitFactor = conf.getUserLimitFactor(queuePath); + maxAppsForAutoCreatedQueues = maxApps; + maxAppsPerUserForAutoCreatedQueues = + (int) (maxApps * (userLimit / 100.0f) * userLimitFactor); + + } + + /** + * Number of maximum applications for each of the auto created leaf queues. + * + * @return maxAppsForAutoCreatedQueues + */ + public int getMaxApplicationsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + /** + * Number of maximum applications per user for each of the auto created + * leaf queues. + * + * @return maxAppsPerUserForAutoCreatedQueues + */ + public int getMaxApplicationsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + /** + * User limit value for each of the auto created leaf queues. + * + * @return userLimit + */ + public int getUserLimitForAutoCreatedQueues() { + return userLimit; + } + + /** + * User limit factor value for each of the auto created leaf queues. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + public int getMaxAppsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + public int getMaxAppsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + public int getUserLimit() { + return userLimit; + } + + /** + * Add the specified child queue. + * @param childQueue reference to the child queue to be added + * @throws SchedulerDynamicEditException + */ + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(childQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the specified child queue. + * @param childQueue reference to the child queue to be removed + * @throws SchedulerDynamicEditException + */ + public void removeChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being removed has non zero capacity."); + } + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(childQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}" + cs.getQueueName()); + } + } + } + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the specified child queue. + * @param childQueueName name of the child queue to be removed + * @throws SchedulerDynamicEditException + */ + public CSQueue removeChildQueue(String childQueueName) + throws SchedulerDynamicEditException { + CSQueue childQueue; + try { + writeLock.lock(); + childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue( + childQueueName); + if (childQueue != null) { + removeChildQueue(childQueue); + } else { + throw new SchedulerDynamicEditException("Cannot find queue to delete " + + ": " + childQueueName); + } + } finally { + writeLock.unlock(); + } + return childQueue; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java similarity index 63% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index 3d1b7317489..4eb7cdd9d95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,35 +18,35 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.io.IOException; - import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** - * This represents a dynamic {@link LeafQueue} managed by the - * {@link ReservationSystem} - * + * Leaf queues which are auto created by an underkying implementation of + * AbstractManagedParentQueue. Eg: PlanQueue for reservations or + * ManagedParentQueue for auto created dynamic queues */ -public class ReservationQueue extends LeafQueue { +public class AutoCreatedLeafQueue extends LeafQueue { private static final Logger LOG = LoggerFactory - .getLogger(ReservationQueue.class); + .getLogger(AutoCreatedLeafQueue.class); - private PlanQueue parent; + private AbstractManagedParentQueue parent; - public ReservationQueue(CapacitySchedulerContext cs, String queueName, - PlanQueue parent) throws IOException { + public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, + AbstractManagedParentQueue parent) throws IOException { super(cs, queueName, parent, null); - // the following parameters are common to all reservation in the plan - updateQuotas(parent.getUserLimitForReservation(), + + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + this.parent = parent; } @@ -55,21 +55,18 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { try { writeLock.lock(); - // Sanity check - if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } + + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, null); - updateQuotas(parent.getUserLimitForReservation(), + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + } finally { writeLock.unlock(); } @@ -77,10 +74,10 @@ public void reinitialize(CSQueue newlyParsedQueue, /** * This methods to change capacity for a queue and adjusts its - * absoluteCapacity - * + * absoluteCapacity. + * * @param entitlement the new entitlement for the queue (capacity, - * maxCapacity, etc..) + * maxCapacity) * @throws SchedulerDynamicEditException */ public void setEntitlement(QueueEntitlement entitlement) @@ -94,8 +91,6 @@ public void setEntitlement(QueueEntitlement entitlement) } setCapacity(capacity); setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - // note: we currently set maxCapacity to capacity - // this might be revised later setMaxCapacity(entitlement.getMaxCapacity()); if (LOG.isDebugEnabled()) { LOG.debug("successfully changed to " + capacity + " for queue " + this @@ -106,12 +101,14 @@ public void setEntitlement(QueueEntitlement entitlement) } } - private void updateQuotas(int userLimit, float userLimitFactor, - int maxAppsForReservation, int maxAppsPerUserForReservation) { - setUserLimit(userLimit); - setUserLimitFactor(userLimitFactor); - setMaxApplications(maxAppsForReservation); - maxApplicationsPerUser = maxAppsPerUserForReservation; + private void validate(final CSQueue newlyParsedQueue) throws IOException { + if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Error trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + } @Override @@ -119,4 +116,14 @@ protected void setupConfigurableCapacities() { CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), queueCapacities, parent == null ? null : parent.getQueueCapacities()); } + + private void updateApplicationAndUserLimits(int userLimit, + float userLimitFactor, + int maxAppsForAutoCreatedQueues, + int maxAppsPerUserForAutoCreatedQueues) { + setUserLimit(userLimit); + setUserLimitFactor(userLimitFactor); + setMaxApplications(maxAppsForAutoCreatedQueues); + setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d91aa55a487..e4e372e019b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -64,11 +64,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -138,6 +138,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -171,6 +173,8 @@ private int maxAssignPerHeartbeat; + private final Clock clock = new UTCClock(); + private CSConfigurationProvider csConfProvider; @Override @@ -454,6 +458,11 @@ long getAsyncScheduleInterval() { private final static Random random = new Random(System.currentTimeMillis()); + @Override + public Clock getClock() { + return clock; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -560,44 +569,17 @@ public int getPendingBacklogs() { } @VisibleForTesting - public UserGroupMappingPlacementRule - getUserGroupMappingPlacementRule() throws IOException { + public PlacementRule getUserGroupMappingPlacementRule() throws IOException { try { readLock.lock(); - boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); - LOG.info( - "Initialized queue mappings, override: " + overrideWithQueueMappings); - - // Get new user/group mappings - List newMappings = conf.getQueueMappings(); - // check if mappings refer to valid queues - for (QueueMapping mapping : newMappings) { - String mappingQueue = mapping.getQueue(); - if (!mappingQueue.equals( - UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue - .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = getQueue(mappingQueue); - if (queue == null || !(queue instanceof LeafQueue)) { - throw new IOException( - "mapping contains invalid or non-leaf queue " + mappingQueue); - } - } - } - - // initialize groups if mappings are present - if (newMappings.size() > 0) { - Groups groups = new Groups(conf); - return new UserGroupMappingPlacementRule(overrideWithQueueMappings, - newMappings, groups); - } - - return null; + return UserGroupMappingPlacementRule.get(this); } finally { readLock.unlock(); } } - private void updatePlacementRules() throws IOException { + @VisibleForTesting + void updatePlacementRules() throws IOException { // Initialize placement rules Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); @@ -731,28 +713,56 @@ private void addApplicationOnRecovery( } } - private void addApplication(ApplicationId applicationId, - String queueName, String user, Priority priority) { + private void addApplication(ApplicationId applicationId, String queueName, + String user, Priority priority) { try { writeLock.lock(); if (isSystemAppsLimitReached()) { String message = "Maximum system application limit reached," + "cannot accept submission of application: " + applicationId; - this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( - applicationId, RMAppEventType.APP_REJECTED, message)); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); return; } // Sanity checks. CSQueue queue = getQueue(queueName); + if (queue == null) { - String message = - "Application " + applicationId + " submitted by user " + user - + " to unknown queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); - return; + //Could be a potential auto-created leaf queue + try { + LeafQueue autoCreatedLeafQueue = autoCreateLeafQueue(applicationId); + if (autoCreatedLeafQueue == null) { + final String message = + "Application " + applicationId + " submitted by user " + user + + " to unknown queue: " + queueName; + + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } else{ + queue = autoCreatedLeafQueue; + } + + } catch (YarnException e) { + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user " + user + + " to queue: " + queueName + " failed : " + e.getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } catch (IOException e) { + final String message = + "Application " + applicationId + " submission by user " + user + + " to queue: " + queueName + " failed : " + e.getMessage(); + LOG.error("Could not auto-create leaf queue due to : ", e); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } } + if (!(queue instanceof LeafQueue)) { String message = "Application " + applicationId + " submitted by user " + user @@ -761,7 +771,37 @@ private void addApplication(ApplicationId applicationId, new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); return; + } else if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + RMApp rmApp = this.rmContext.getRMApps().get(applicationId); + ApplicationPlacementContext placementContext = + rmApp.getApplicationQueuePlacementContext(); + + if (placementContext == null) { + String message = + "Application " + applicationId + " submission by user " + user + + " to queue: " + queueName + " failed : " + + "Queue mapping does not exist for user"; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } else if (!queue.getParent().getQueueName().equals( + placementContext.getParentQueue())) { + String message = + "Auto created Leaf queue " + placementContext.getQueue() + " already exists under " + queue + .getParent().getQueuePath() + + ".But Queue mapping has a different parent queue " + + placementContext.getParentQueue() + + " for the specified user : " + user; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } } + // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); @@ -1726,7 +1766,7 @@ public FiCaSchedulerNode getNode(NodeId nodeId) { @Override @Lock(Lock.NoLock.class) - public void recover(RMState state) throws Exception { + public void recover(RMStateStore.RMState state) throws Exception { // NOT IMPLEMENTED } @@ -1921,12 +1961,12 @@ public void removeQueue(String queueName) writeLock.lock(); LOG.info("Removing queue: " + queueName); CSQueue q = this.getQueue(queueName); - if (!(q instanceof ReservationQueue)) { + if (!(q instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "The queue that we are asked " + "to remove (" + queueName - + ") is not a ReservationQueue"); + + ") is not a AutoCreatedLeafQueue"); } - ReservationQueue disposableLeafQueue = (ReservationQueue) q; + AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q; // at this point we should have no more apps if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException( @@ -1936,9 +1976,11 @@ public void removeQueue(String queueName) + " pending apps"); } - ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + ((AbstractManagedParentQueue) disposableLeafQueue.getParent()) + .removeChildQueue(q); this.queueManager.removeQueue(queueName); - LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); + LOG.info("Removal of AutoCreatedLeafQueue " + + queueName + " has succeeded"); } finally { writeLock.unlock(); } @@ -1949,25 +1991,27 @@ public void addQueue(Queue queue) throws SchedulerDynamicEditException { try { writeLock.lock(); - if (!(queue instanceof ReservationQueue)) { + if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( - "Queue " + queue.getQueueName() + " is not a ReservationQueue"); + "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue"); } - ReservationQueue newQueue = (ReservationQueue) queue; + AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; - if (newQueue.getParent() == null || !(newQueue - .getParent() instanceof PlanQueue)) { + if (newQueue.getParent() == null + || !(AbstractManagedParentQueue.class. + isAssignableFrom(newQueue.getParent().getClass()))) { throw new SchedulerDynamicEditException( "ParentQueue for " + newQueue.getQueueName() - + " is not properly set (should be set and be a PlanQueue)"); + + " is not properly set (should be set and be a PlanQueue or AutoCreateEnableParentQueue)"); } - PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); + AbstractManagedParentQueue parentPlan = + (AbstractManagedParentQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); - LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); } @@ -1981,21 +2025,22 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); ParentQueue parent = (ParentQueue) queue.getParent(); - if (!(queue instanceof ReservationQueue)) { + if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( "Entitlement can not be" + " modified dynamically since queue " - + inQueue + " is not a ReservationQueue"); + + inQueue + " is not a AutoCreatedLeafQueue"); } - if (!(parent instanceof PlanQueue)) { + if (parent == null + || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) { throw new SchedulerDynamicEditException( - "The parent of ReservationQueue " + inQueue - + " must be an PlanQueue"); + "The parent of AutoCreatedLeafQueue " + inQueue + + " must be a PlanQueue/AutoCreateEnabledParentQueue"); } - ReservationQueue newQueue = (ReservationQueue) queue; + AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; - float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); + float sumChilds = parent.sumOfChildCapacities(); float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity(); @@ -2010,12 +2055,13 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) newQueue.setEntitlement(entitlement); } else{ throw new SchedulerDynamicEditException( - "Sum of child queues would exceed 100% for PlanQueue: " + parent - .getQueueName()); + "Sum of child queues should exceed 100% for auto creating parent " + + "queue : " + parent.getQueueName()); } LOG.info( - "Set entitlement for ReservationQueue " + inQueue + " to " + queue - .getCapacity() + " request was (" + entitlement.getCapacity() + "Set entitlement for AutoCreatedLeafQueue " + inQueue + + " to " + queue.getCapacity() + + " request was (" + entitlement.getCapacity() + ")"); } finally { writeLock.unlock(); @@ -2631,4 +2677,48 @@ public MutableConfigurationProvider getMutableConfProvider() { } return null; } + + private LeafQueue autoCreateLeafQueue(final ApplicationId applicationId) + throws IOException, YarnException { + + AutoCreatedLeafQueue autoCreatedLeafQueue = null; + RMApp rmApp = this.rmContext.getRMApps().get(applicationId); + ApplicationPlacementContext placementContext = + rmApp.getApplicationQueuePlacementContext(); + + if (placementContext != null) { + + String leafQueueName = placementContext.getQueue(); + String parentQueueName = placementContext.getParentQueue(); + + if (!StringUtils.isEmpty(parentQueueName)) { + CSQueue parentQueue = getQueue(parentQueueName); + + if (parentQueue != null + && conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) { + + ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue; + autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, + autoCreateEnabledParentQueue); + + addQueue(autoCreatedLeafQueue); + + //TODO - Set entitlement through capacity management policy + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + ". Queue mapping specifies an invalid parent queue which does not exist " + + parentQueueName); + } + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping does not specify which parent queue it needs to be created under."); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue mapping does not exist for application " + applicationId + " submitted by user : " + rmApp.getUser()); + } + } + return autoCreatedLeafQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ecf5f1..4c2641b18f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; @@ -906,6 +907,11 @@ public boolean getOverrideWithQueueMappings() { DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + @VisibleForTesting + public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings); + } + /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value @@ -980,6 +986,34 @@ public boolean getOverrideWithQueueMappings() { return mappings; } + @VisibleForTesting + public void setQueuePlacementRules(Collection queuePlacementRules) { + if (queuePlacementRules == null) { + return; + } + String str = StringUtils.join(",", queuePlacementRules); + setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str); + } + + public Collection getQueuePlacementRules() { + return getStringCollection( + YarnConfiguration.QUEUE_PLACEMENT_RULES); + } + + @VisibleForTesting + public void setQueueMappings(List queueMappings) { + if (queueMappings == null) { + return; + } + + List queueMappingStrs = new ArrayList<>(); + for (QueueMapping mapping : queueMappings) { + queueMappingStrs.add(mapping.toString()); + } + + setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); + } + public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); @@ -1522,4 +1556,132 @@ public long getDefaultLifetimePerQueue(String queue) { public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); } + + @Private + public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = + "auto-create-child-queue.enabled"; + + public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = + "leaf-queue-template"; + + @Private + public static final String AUTO_CREATE_QUEUE_MAX_QUEUES = + "auto-create-child-queue.max-queues"; + + @Private + public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000; + + /** + * If true, this queue will be created as a Parent Queue which Auto Created leaf child queues + * @param queuePath The queues path + * @return true if auto create is enabled for child queues else false. Default is false + */ + public boolean isAutoCreateChildQueueEnabled(String queuePath) { + boolean isAutoCreateEnabled = + getBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED); + return isAutoCreateEnabled; + } + + @VisibleForTesting + public void setAutoCreateChildQueueEnabled(String queuePath, boolean autoCreationEnabled) { + setBoolean(getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED, autoCreationEnabled); + } + + /** + * Get the auto created leaf queue's minimum guaranteed capacity. + * Leaf queue's template capacities are configured at the parent queue + * @param queuePath The parent queue's path + * @return the leaf queue's template capacity + */ + public float getAutoCreatedLeafQueueTemplateCapacity(String queuePath) { + return + getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + CAPACITY, + (float) UNDEFINED); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, float capacity) { + setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + + CAPACITY, + capacity); + } + + /** + * Get the auto created leaf queue's maximum capacity + * @param queuePath The parent queue's path + * @return the leaf queue's template capacity + * @return + */ + public float getAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath) { + return + getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + MAXIMUM_CAPACITY, + (float) UNDEFINED); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, float maxCapacity) { + setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + + MAXIMUM_CAPACITY, + maxCapacity); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxApplicationsPerQueue(String queue, int maxApplicationsPerQueue) { + setInt(getAutoCreatedQueueTemplateConfPrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX, + maxApplicationsPerQueue); + } + + /** + * Get the auto created leaf queue's template configuration prefix + * Leaf queue's template capacities are configured at the parent queue + * @param queuePath parent queue's path + * @return Config prefix for leaf queue template configurations + */ + public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) { + return getQueuePrefix(queuePath) + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX + DOT; + } + + @Private + public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = + "auto-create-child-queue.fail-on-exceeding-parent-capacity"; + + @Private + public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = false; + + /** + * Fail further auto leaf queue creation when parent's guaranteed capacity is + * exceeded. + * @param parentQueuePath the parent queue's path + * @return true if configured to fail + * else false + */ + public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(String parentQueuePath) { + boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity = + getBoolean(getQueuePrefix(parentQueuePath) + + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, + DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY); + return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity; + } + + @VisibleForTesting + public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(String queuePath, boolean autoCreationEnabled) { + setBoolean(getQueuePrefix(queuePath) + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, autoCreationEnabled); + } + + /** + * Get the max number of leaf queues that are allowed to be created under + * a parent queue + * @param queuePath the paret queue's path + * @return the max number of leaf queues allowed to be auto created + */ + public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { + return + getInt(getAutoCreatedQueueTemplateConfPrefix(queuePath) + AUTO_CREATE_QUEUE_MAX_QUEUES, + DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7c918a53620..f2ef9d53685 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -94,4 +95,7 @@ * @return if configuration is mutable */ boolean isConfigurationMutable(); + + Clock getClock(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 48c289f0cde..0f8ff958ba2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.Clock; /** * @@ -154,7 +155,7 @@ public void setCapacitySchedulerContext( * @throws IOException if fails to initialize queues */ public void initializeQueues(CapacitySchedulerConfiguration conf) - throws IOException { + throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, appPriorityACLManager, queues); @@ -176,7 +177,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) if (!csContext.isConfigurationMutable() || csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { - // Ensure queue hiearchy in the new XML file is proper. + // Ensure queue hierarchy in the new XML file is proper. validateQueueHierarchy(queues, newQueues); } @@ -221,6 +222,7 @@ static CSQueue parseQueue( : (parent.getQueuePath() + "." + queueName); String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); + boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { if (null == parent) { throw new IllegalStateException( @@ -238,7 +240,7 @@ static CSQueue parseQueue( queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; List childQueues = new ArrayList<>(); - ReservationQueue resQueue = new ReservationQueue(csContext, + AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext, defReservationId, (PlanQueue) queue); try { resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f)); @@ -249,11 +251,14 @@ static CSQueue parseQueue( ((PlanQueue) queue).setChildQueues(childQueues); queues.put(defReservationId, resQueue); + } else if (isAutoCreateEnabled) { + queue = new ManagedParentQueue(csContext, queueName, + parent, oldQueues.get(queueName)); + } else { queue = new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName)); - // Used only for unit tests queue = hook.hook(queue); } @@ -262,9 +267,16 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + + ParentQueue parentQueue; + if (isAutoCreateEnabled) { + parentQueue = new ManagedParentQueue(csContext, queueName, + parent, oldQueues.get(queueName)); + } else { + parentQueue = + new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); @@ -277,6 +289,7 @@ static CSQueue parseQueue( childQueues.add(childQueue); } parentQueue.setChildQueues(childQueues); + } if (queue instanceof LeafQueue && queues.containsKey(queueName) @@ -303,7 +316,7 @@ private void validateQueueHierarchy(Map queues, Map newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry e : queues.entrySet()) { - if (!(e.getValue() instanceof ReservationQueue)) { + if (!(e.getValue() instanceof AutoCreatedLeafQueue)) { String queueName = e.getKey(); CSQueue oldQueue = e.getValue(); CSQueue newQueue = newQueues.get(queueName); @@ -323,6 +336,15 @@ private void validateQueueHierarchy(Map queues, throw new IOException(queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() + " after refresh, which is not allowed."); + } else if (oldQueue instanceof ParentQueue && !(oldQueue instanceof ManagedParentQueue) && newQueue + instanceof ManagedParentQueue) { + throw new IOException("Can not convert parent queue: " + + oldQueue.getQueuePath() + " to auto create enabled parent queue since " + + "it could have other pre-configured queues which is not supported"); + } else if (oldQueue instanceof ManagedParentQueue && !(newQueue + instanceof ManagedParentQueue)) { + throw new IOException("Can not convert auto create enabled parent queue: " + + oldQueue.getQueuePath() + " to parent queue with pre-configured queues"); } else if (oldQueue instanceof LeafQueue && newQueue instanceof ParentQueue) { if (oldQueue.getState() == QueueState.STOPPED) { @@ -363,7 +385,10 @@ private void updateQueues(Map existingQueues, .iterator(); itr.hasNext();) { Map.Entry e = itr.next(); String queueName = e.getKey(); - if (!newQueues.containsKey(queueName)) { + CSQueue existingQueue = e.getValue(); + + //TODO - Handle case when auto create is disabled on parent queues but present in configuration + if (!newQueues.containsKey(queueName) && !(existingQueue instanceof AutoCreatedLeafQueue)) { itr.remove(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30aa1ee..1a606dc7022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1997,6 +1997,10 @@ public void setAbsoluteCapacity(float absoluteCapacity) { queueCapacities.setAbsoluteCapacity(absoluteCapacity); } + public void setMaxApplicationsPerUser(int maxApplications) { + this.maxApplicationsPerUser = maxApplications; + } + public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java new file mode 100644 index 00000000000..1b3f188163e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Auto Creation enabled Parent queue. This queue initially does not have any children to start with and all child + * leaf queues will be auto created. Currently this does not allow other pre-configured leaf or parent queues to + * co-exist along with auto-created leaf queues. The auto creation is limited to leaf queues currently. + */ +public class ManagedParentQueue extends AbstractManagedParentQueue { + + private float autoCreatedLeafQueueCapacity = 0.0f; + private float autoCreatedLeafQueueMaxCapacity = 0.0f; + + private float autoCreatedLeafQueueAbsoluteCapacity = 0.0f; + private float autoCreatedLeafQueueAbsoluteMaxCapacity = 0.0f; + + private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false; + + private static final Logger LOG = LoggerFactory.getLogger(ManagedParentQueue.class); + + public ManagedParentQueue(final CapacitySchedulerContext cs, final String queueName, final CSQueue parent, final CSQueue old) throws IOException { + super(cs, queueName, parent, old); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); + } + + @Override + protected void initializeLeafQueueConfigs() { + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + float autoCreatedLeafQueueMaxCapacity = conf.getAutoCreatedLeafQueueTemplateMaxCapacity + (getQueuePath()) / 100; + float autoCreatedLeafQueueCapacity = conf.getAutoCreatedLeafQueueTemplateCapacity(getQueuePath()) / 100; + + this.autoCreatedLeafQueueCapacity = autoCreatedLeafQueueCapacity; + this.autoCreatedLeafQueueMaxCapacity = autoCreatedLeafQueueMaxCapacity; + autoCreatedLeafQueueAbsoluteCapacity = + autoCreatedLeafQueueCapacity * getAbsoluteCapacity(); + autoCreatedLeafQueueAbsoluteMaxCapacity = + autoCreatedLeafQueueMaxCapacity * getAbsoluteMaximumCapacity(); + + this.shouldFailAutoCreationWhenGuaranteedCapacityExceeded = conf + .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(getQueuePath()); + + super.initializeLeafQueueConfigs(); + } + + protected void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + } + + @Override + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + + if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException("Expected child queue to be an instance of AutoCreatedLeafQueue"); + } + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) childQueue.getParent(); + + String leafQueueName = childQueue.getQueueName(); + int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit( + parentQueue.getQueuePath()); + + if (parentQueue.getChildQueues().size() >= maxQueues) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ".Max Child " + + "Queue limit exceeded which is configured as : " + maxQueues + + " and number of child queues is : " + parentQueue + .getChildQueues().size()); + } + + if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) { + if (parentQueue.getAutoCreatedLeafQueueAbsoluteCapacity() + parentQueue + .sumOfChildAbsCapacities() > parentQueue.getAbsoluteCapacity()) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ". Child " + + "queues capacities have reached parent queue : " + + parentQueue.getQueuePath() + " guaranteed capacity"); + } + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + super.addChildQueue(leafQueue); + //TODO - refresh policy queue after capacity management is added + + } finally { + writeLock.unlock(); + } + } + + public float getAutoCreatedLeafQueueCapacity() { + return autoCreatedLeafQueueCapacity; + } + + public float getAutoCreatedLeafQueueMaxCapacity() { + return autoCreatedLeafQueueMaxCapacity; + } + + public float getAutoCreatedLeafQueueAbsoluteCapacity() { + return autoCreatedLeafQueueAbsoluteCapacity; + } + + public float getAutoCreatedLeafQueueAbsoluteMaxCapacity() { + return autoCreatedLeafQueueAbsoluteMaxCapacity; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..36d7c0c6a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1080,4 +1080,30 @@ public void stopQueue() { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } + + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } + + protected float sumOfChildAbsCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getAbsoluteCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 882262fafcc..d4444105de7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,11 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,189 +31,48 @@ * reservations, but functionality wise is a sub-class of ParentQueue * */ -public class PlanQueue extends ParentQueue { +public class PlanQueue extends AbstractManagedParentQueue { private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); - private int maxAppsForReservation; - private int maxAppsPerUserForReservation; - private int userLimit; - private float userLimitFactor; - protected CapacitySchedulerContext schedulerContext; private boolean showReservationsAsQueues; public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - - this.schedulerContext = cs; - // Set the reservation queue attributes for the Plan - CapacitySchedulerConfiguration conf = cs.getConfiguration(); - String queuePath = super.getQueuePath(); - int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); - showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); - if (maxAppsForReservation < 0) { - maxAppsForReservation = - (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super - .getAbsoluteCapacity()); - } - int userLimit = conf.getUserLimit(queuePath); - float userLimitFactor = conf.getUserLimitFactor(queuePath); - int maxAppsPerUserForReservation = - (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); - updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, - maxAppsPerUserForReservation); - - StringBuffer queueInfo = new StringBuffer(); - queueInfo.append("Created Plan Queue: ").append(queueName) - .append("\nwith capacity: [").append(super.getCapacity()) - .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) - .append("\nwith max reservation apps: [").append(maxAppsForReservation) - .append("]\nwith max reservation apps per user: [") - .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") - .append(userLimit).append("]\nwith user limit factor: [") - .append(userLimitFactor).append("]."); - LOG.info(queueInfo.toString()); } @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { - try { - writeLock.lock(); - // Sanity check - if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } - - PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - - if (newlyParsedParentQueue.getChildQueues().size() != 1) { - throw new IOException( - "Reservable Queue should not have sub-queues in the" - + "configuration expect the default reservation queue"); - } - - // Set new configs - setupQueueConfigs(clusterResource); - - updateQuotas(newlyParsedParentQueue.userLimit, - newlyParsedParentQueue.userLimitFactor, - newlyParsedParentQueue.maxAppsForReservation, - newlyParsedParentQueue.maxAppsPerUserForReservation); - - // run reinitialize on each existing queue, to trigger absolute cap - // recomputations - for (CSQueue res : this.getChildQueues()) { - res.reinitialize(res, clusterResource); - } - showReservationsAsQueues = - newlyParsedParentQueue.showReservationsAsQueues; - } finally { - writeLock.unlock(); - } + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); } - void addChildQueue(CSQueue newQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (newQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + newQueue + " being added has non zero capacity."); - } - boolean added = this.childQueues.add(newQueue); - if (LOG.isDebugEnabled()) { - LOG.debug("updateChildQueues (action: add queue): " + added + " " - + getChildQueuesToPrint()); - } - } finally { - writeLock.unlock(); - } + @Override + protected void initializeLeafQueueConfigs() { + String queuePath = super.getQueuePath(); + showReservationsAsQueues = csContext.getConfiguration() + .getShowReservationAsQueues(queuePath); + super.initializeLeafQueueConfigs(); } - void removeChildQueue(CSQueue remQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (remQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + remQueue + " being removed has non zero capacity."); - } - Iterator qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); - } - } - } - } finally { - writeLock.unlock(); + protected void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); } - } - protected float sumOfChildCapacities() { - try { - writeLock.lock(); - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); - } - return ret; - } finally { - writeLock.unlock(); - } - } + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - private void updateQuotas(int userLimit, float userLimitFactor, - int maxAppsForReservation, int maxAppsPerUserForReservation) { - this.userLimit = userLimit; - this.userLimitFactor = userLimitFactor; - this.maxAppsForReservation = maxAppsForReservation; - this.maxAppsPerUserForReservation = maxAppsPerUserForReservation; - } - - /** - * Number of maximum applications for each of the reservations in this Plan. - * - * @return maxAppsForreservation - */ - public int getMaxApplicationsForReservations() { - return maxAppsForReservation; - } - - /** - * Number of maximum applications per user for each of the reservations in - * this Plan. - * - * @return maxAppsPerUserForreservation - */ - public int getMaxApplicationsPerUserForReservation() { - return maxAppsPerUserForReservation; - } - - /** - * User limit value for each of the reservations in this Plan. - * - * @return userLimit - */ - public int getUserLimitForReservation() { - return userLimit; - } - - /** - * User limit factor value for each of the reservations in this Plan. - * - * @return userLimitFactor - */ - public float getUserLimitFactor() { - return userLimitFactor; + if (newlyParsedParentQueue.getChildQueues().size() != 1) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration expect the default reservation queue"); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 399df02465e..10b9ced2f28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -243,6 +244,11 @@ public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index 61bc8d9be2c..a6cdeb61f39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -58,8 +58,8 @@ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, ApplicationSubmissionContext asc = Records.newRecord(ApplicationSubmissionContext.class); asc.setQueue(inputQueue); - String queue = rule.getQueueForApp(asc, inputUser); - Assert.assertEquals(expectedQueue, queue); + ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser); + Assert.assertEquals(expectedQueue, ctx != null ? ctx.getQueue() : inputQueue); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 39a7f995ab6..ad6c584b04a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -327,6 +328,11 @@ public boolean isAppInCompletedStates() { return false; } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + return null; + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java similarity index 70% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java index e23e93c99dd..b403e724533 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java @@ -36,15 +36,19 @@ import org.junit.Before; import org.junit.Test; -public class TestReservationQueue { +/** + * Test class for dynamic auto created leaf queues. + * @see AutoCreatedLeafQueue + */ +public class TestAutoCreatedLeafQueue { - CapacitySchedulerConfiguration csConf; - CapacitySchedulerContext csContext; + private CapacitySchedulerConfiguration csConf; + private CapacitySchedulerContext csContext; final static int DEF_MAX_APPS = 10000; final static int GB = 1024; private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - ReservationQueue reservationQueue; + private AutoCreatedLeafQueue autoCreatedLeafQueue; @Before public void setup() throws IOException { @@ -61,49 +65,48 @@ public void setup() throws IOException { when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - RMContext mockRMContext = TestUtils.getMockRMContext(); when(csContext.getRMContext()).thenReturn(mockRMContext); // create a queue PlanQueue pq = new PlanQueue(csContext, "root", null, null); - reservationQueue = new ReservationQueue(csContext, "a", pq); + autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq); } - private void validateReservationQueue(double capacity) { - assertTrue(" actual capacity: " + reservationQueue.getCapacity(), - reservationQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); - assertEquals(reservationQueue.maxApplications, DEF_MAX_APPS); - assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS); + private void validateAutoCreatedLeafQueue(double capacity) { + assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(), + autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); + assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS); + assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS); } @Test public void testAddSubtractCapacity() throws Exception { // verify that setting, adding, subtracting capacity works - reservationQueue.setCapacity(1.0F); - validateReservationQueue(1); - reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); - validateReservationQueue(0.9); - reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); - validateReservationQueue(1); - reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); - validateReservationQueue(0); + autoCreatedLeafQueue.setCapacity(1.0F); + validateAutoCreatedLeafQueue(1); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); + validateAutoCreatedLeafQueue(0.9); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f)); + validateAutoCreatedLeafQueue(1); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f)); + validateAutoCreatedLeafQueue(0); try { - reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); fail(); } catch (SchedulerDynamicEditException iae) { // expected - validateReservationQueue(1); + validateAutoCreatedLeafQueue(1); } try { - reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); + autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); fail(); } catch (SchedulerDynamicEditException iae) { // expected - validateReservationQueue(1); + validateAutoCreatedLeafQueue(1); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4eea75f..a19138dd7a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -103,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -904,7 +907,7 @@ void checkQueueCapacities(CapacityScheduler cs, (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f); } - private void checkQueueCapacity(CSQueue q, float expectedCapacity, + void checkQueueCapacity(CSQueue q, float expectedCapacity, float expectedAbsCapacity, float expectedMaxCapacity, float expectedAbsMaxCapacity) { final float epsilon = 1e-5f; @@ -917,7 +920,7 @@ private void checkQueueCapacity(CSQueue q, float expectedCapacity, q.getAbsoluteMaximumCapacity(), epsilon); } - private CSQueue findQueue(CSQueue root, String queuePath) { + CSQueue findQueue(CSQueue root, String queuePath) { if (root.getQueuePath().equals(queuePath)) { return root; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java new file mode 100644 index 00000000000..70728abddde --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -0,0 +1,695 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for creation and reinitilization of auto created leaf queues + * under a ManagedParentQueue. + */ +public class TestCapacitySchedulerAutoQueueCreation { + + private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private final int GB = 1024; + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + private static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + private static final String A1 = A + ".a1"; + private static final String A2 = A + ".a2"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static final String C1 = C + ".c1"; + private static final String C2 = C + ".c2"; + private static final String C3 = C + ".c3"; + private static float A_CAPACITY = 20f; + private static float B_CAPACITY = 40f; + private static float C_CAPACITY = 20f; + private static float D_CAPACITY = 20f; + private static float A1_CAPACITY = 30; + private static float A2_CAPACITY = 70; + private static float B1_CAPACITY = 60f; + private static float B2_CAPACITY = 20f; + private static float B3_CAPACITY = 20f; + private static float C1_CAPACITY = 20f; + private static float C2_CAPACITY = 20f; + + final String USER = "user_"; + final String USER0 = USER + 0; + final String USER2 = USER + 2; + final String PARENT_QUEUE = "c"; + + private MockRM mockRM = null; + + private CapacityScheduler cs; + + private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + + @Before + public void setUp() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List queuePlacementRules = new ArrayList(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + setupQueueMappings(conf); + + mockRM = new MockRM(conf); + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + + cs.start(); + } + + private CapacitySchedulerConfiguration setupQueueMappings( + CapacitySchedulerConfiguration conf) { + + //set queue mapping + List queueMappings = + new ArrayList<>(); + for (int i = 0; i <= 3; i++) { + //Set C as parent queue name for auto queue creation + UserGroupMappingPlacementRule.QueueMapping userQueueMapping = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, + USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + queueMappings.add(userQueueMapping); + } + + conf.setQueueMappings(queueMappings); + //override with queue mappings + conf.setOverrideWithQueueMappings(true); + return conf; + } + + /** + * @param conf, to be modified + * @return, CS configuration which has C as an auto creation enabled parent queue + *

+ * root + * / \ \ \ + * a b c d + * / \ / | \ + * a1 a2 b1 b2 b3 + */ + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c", "d" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + conf.setCapacity(C, C_CAPACITY); + conf.setCapacity(D, D_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] { "b1", "b2", "b3" }); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + + LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); + + conf.setUserLimitFactor(D, 1.0f); + conf.setAutoCreateChildQueueEnabled(D, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f); + + LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); + + return conf; + } + + @After + public void tearDown() throws Exception { + if (mockRM != null) { + mockRM.stop(); + } + } + + @Test + public void testAutoCreateLeafQueueCreation() throws Exception { + + try { + // submit an app + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, + null, USER0); + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + ApplicationPlacementContext placementContext = + app.getApplicationQueuePlacementContext(); + validatePlacementContext(placementContext, USER0, PARENT_QUEUE); + + assertNotNull(cs.getQueue(USER0)); + + AutoCreatedLeafQueue autoCreatedLeafQueue = + (AutoCreatedLeafQueue) cs.getQueue(USER0); + ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue("c"); + assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); + validateCapacities(autoCreatedLeafQueue); + } finally { + cleanupQueue(USER0); + } + } + + @Test + public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + // submit an app + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, + null, USER0); + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + ApplicationPlacementContext placementContext = + app.getApplicationQueuePlacementContext(); + validatePlacementContext(placementContext, USER0, PARENT_QUEUE); + + assertNotNull(cs.getQueue(USER0)); + + AutoCreatedLeafQueue autoCreatedLeafQueue = + (AutoCreatedLeafQueue) cs.getQueue(USER0); + ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue("c"); + assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); + validateCapacities(autoCreatedLeafQueue); + + ApplicationAttemptId appAttemptId = appsInC.get(0); + + Priority priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory( + null); + ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId, Collections.singletonList(r1), + Collections.emptyList(), Collections.singletonList(host), + null, NULL_UPDATE_REQUESTS); + + //And this will result in container assignment for app1 + CapacityScheduler.schedule(cs); + + //change state to draining + autoCreatedLeafQueue.stopQueue(); + + cs.killAllAppsInQueue(USER0); + + mockRM.waitForState(appAttemptId, RMAppAttemptState.KILLED); + + mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED); + + //change state to stopped + autoCreatedLeafQueue.stopQueue(); + assertEquals(QueueState.STOPPED, + autoCreatedLeafQueue.getQueueInfo().getQueueState()); + + cs.reinitialize(cs.getConf(), mockRM.getRMContext()); + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + USER0); + validateCapacities(leafQueue); + + } finally { + cleanupQueue(USER0); + } + } + + @Test + public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception { + + MockRM mockRM = setupSchedulerInstance(); + try { + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + + // Test add one auto created queue dynamically and manually modify capacity + AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(cs, "c1", (ManagedParentQueue) cs.getQueue("c")); + cs.addQueue(c1); + c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f)); + + // Test add another auto created queue and use setEntitlement to modify + // capacity + AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(cs, "c2", (ManagedParentQueue) cs.getQueue("c")); + cs.addQueue(c2); + cs.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f)); + + // Verify all allocations match + checkQueueCapacities(cs, C_CAPACITY, D_CAPACITY); + + // Reinitialize and verify all dynamic queued survived + + conf.setCapacity(A, 20f); + conf.setCapacity(B, 20f); + conf.setCapacity(C, 40f); + conf.setCapacity(D, 20f); + cs.reinitialize(conf, mockRM.getRMContext()); + + checkQueueCapacities(cs, 40f, 20f); + + //chnage parent template configs and reinitialize + conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + cs.reinitialize(conf, mockRM.getRMContext()); + + ManagedParentQueue c = (ManagedParentQueue) cs.getQueue("c"); + AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(cs, "c3", c); + cs.addQueue(c3); + + c3.setEntitlement( + new QueueEntitlement(c.getAutoCreatedLeafQueueCapacity(), 1f)); + cs.reinitialize(conf, mockRM.getRMContext()); + + checkQueueCapacities(cs, 40f, 20f); + } finally { + if ( mockRM != null) { + ((CapacityScheduler) mockRM.getResourceScheduler()).stop(); + mockRM.stop(); + } + } + } + + @Test + public void testReinitializeFailsWithAutoCreateDisabledOnManagedParentQueue() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + + //TODO - should we allow this and retain all auto created leaf queues + // as normal leaf queues? + // what should be the behaviour in phase 1? + newConf.setAutoCreateChildQueueEnabled(C, false); + + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + fail( + "Expected exception while converting a auto create enabled parent queue to a parent queue"); + } catch (IOException e) { + //expected exception + } finally { + newCS.stop(); + } + } + + @Test + public void testConvertLeafQueueToParentQueueWithAutoCreate() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10); + newConf.setAutoCreateChildQueueEnabled(A1, true); + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + final LeafQueue a1Queue = (LeafQueue) newCS.getQueue("a1"); + a1Queue.stopQueue(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + } finally { + newCS.stop(); + } + } + + @Test + public void testConvertFailsFromParentQueueToManagedParentQueue() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10); + newConf.setAutoCreateChildQueueEnabled(A, true); + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + final ParentQueue a1Queue = (ParentQueue) newCS.getQueue("a"); + a1Queue.stopQueue(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + fail( + "Expected exception while converting a parent queue to an auto create enabled parent queue"); + } catch (IOException e) { + //expected exception + } finally { + newCS.stop(); + } + } + + @Test(timeout = 120000) + public void testAutoCreateLeafQueueFailsWithNoQueueMapping() + throws Exception { + + final String INVALID_USER = "invalid_user"; + + // submit an app under a different queue name which does not exist and queue mapping does not exist for this user + RMApp app = mockRM.submitApp(GB, "app", INVALID_USER, null, INVALID_USER, + false); + mockRM.drainEvents(); + mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app.getState()); + } + + private void validatePlacementContext( + ApplicationPlacementContext placementContext, String source, + String parentQueue) { + assertNotNull(placementContext); + assertEquals(placementContext.getParentQueue(), parentQueue); + assertEquals(placementContext.getQueue(), source); + + assertEquals(placementContext.getRule().getName(), + UserGroupMappingPlacementRule.class.getName()); + } + + private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { + assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, + EPSILON); + int maxAppsForAutoCreatedQueues = (int) ( + CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS + * autoCreatedLeafQueue.getParent().getAbsoluteCapacity()); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), + maxAppsForAutoCreatedQueues); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), + (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration() + .getUserLimitFactor( + autoCreatedLeafQueue.getParent().getQueuePath())))); + } + + private void cleanupQueue(String queueName) throws YarnException { + AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName); + if (queue != null) { + queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f)); + ((ManagedParentQueue) queue.getParent()).removeChildQueue( + queue.getQueueName()); + cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); + } else{ + throw new YarnException("Queue does not exist " + queueName); + } + } + + private ApplicationId submitApp(CSQueue parentQueue, String leafQueueName, + String user, int expectedNumAppsInParentQueue, + int expectedNumAppsInLeafQueue) throws Exception { + // submit an app + RMApp rmApp = mockRM.submitApp(GB, "test-auto-queue-activation", user, null, + leafQueueName); + + // check preconditions + List appsInParentQueue = cs.getAppsInQueue( + parentQueue.getQueueName()); + assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size()); + + List appsInLeafQueue = cs.getAppsInQueue( + leafQueueName); + assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size()); + + return rmApp.getApplicationId(); + } + + String getQueueMapping(String parentQueue, String leafQueue) { + return parentQueue + DOT + leafQueue; + } + + @Test(timeout = 120000) + public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping() + throws Exception { + + MockRM mockRM = setupSchedulerInstance(); + try { + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + + //"a" is not auto create enabled + try { + setupQueueMapping(cs, CURRENT_USER_MAPPING, "a", CURRENT_USER_MAPPING); + cs.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage().contains( + "invalid parent queue which does not have auto creation of leaf queues enabled [" + + "a" + "]")); + } + + //"a" is not auto create enabled and app_user does not exist as a leaf queue + try { + setupQueueMapping(cs, "app_user", "INVALID_PARENT_QUEUE", "app_user"); + cs.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage() + .contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]")); + } + } finally { + if ( mockRM != null) { + ((CapacityScheduler) mockRM.getResourceScheduler()).stop(); + mockRM.stop(); + } + } + } + + @Test(timeout = 120000) + public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping() + throws Exception { + + MockRM mockRM = setupSchedulerInstance(); + try { + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + + setupQueueMapping(cs, CURRENT_USER_MAPPING, "c", CURRENT_USER_MAPPING); + cs.updatePlacementRules(); + + try { + setupQueueMapping(cs, CURRENT_USER_MAPPING, "", CURRENT_USER_MAPPING); + cs.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage().contains("invalid parent queue []")); + } + } finally { + if (mockRM != null) { + ((CapacityScheduler) mockRM.getResourceScheduler()).stop(); + mockRM.stop(); + } + } + } + + @Test(timeout = 10000) + public void testParentQueueUpdateInQueueMappingFailsAfterAutoCreation() + throws Exception { + + MockRM mockRM = setupSchedulerInstance(); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + try { + mockRM.start(); + cs.start(); + + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, null, + USER0); + + assertNotNull(cs.getQueue(USER0)); + + setupQueueMapping(cs, USER0, "d", USER0); + cs.updatePlacementRules(); + + app = mockRM.submitApp(GB, "test-auto-queue-creation-2", USER0, null, + USER0, false); + mockRM.drainEvents(); + mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app.getState()); + } finally { + if ( mockRM != null) { + ((CapacityScheduler) mockRM.getResourceScheduler()).stop(); + mockRM.stop(); + } + } + } + + private List setupQueueMapping( + CapacityScheduler cs, String user, String parentQueue, String queue) { + List queueMappings = + new ArrayList<>(); + queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user, + getQueueMapping(parentQueue, queue))); + cs.getConfiguration().setQueueMappings(queueMappings); + return queueMappings; + } + + private MockRM setupSchedulerInstance() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List queuePlacementRules = new ArrayList(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + setupQueueMappings(conf); + + MockRM mockRM = new MockRM(conf); + return mockRM; + } + + void checkQueueCapacities(CapacityScheduler cs, + float capacityC, float capacityD) { + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueC = tcs.findQueue(rootQueue, C); + CSQueue queueD = tcs.findQueue(rootQueue, D); + CSQueue queueC1 = tcs.findQueue(queueC, C1); + CSQueue queueC2 = tcs.findQueue(queueC, C2); + CSQueue queueC3 = tcs.findQueue(queueC, C3); + + float capC = capacityC / 100.0f; + float capD = capacityD / 100.0f; + + tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f, + (C1_CAPACITY/100.0f) * capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f, + (C2_CAPACITY/100.0f) * capC, 1.0f, 1.0f); + + if ( queueC3 != null ) { + float leafQueueTemplateCapacity = cs.getConfiguration() + .getAutoCreatedLeafQueueTemplateCapacity(C); + tcs.checkQueueCapacity(queueC3, + leafQueueTemplateCapacity/ 100.0f, + (leafQueueTemplateCapacity / 100.0f) * capC, 1.0f, 1.0f); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java index 9aba30c2e88..9425d5ea89b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -77,21 +77,21 @@ public void testRefreshQueuesWithReservations() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); //set default queue capacity to zero - ((ReservationQueue) cs + ((AutoCreatedLeafQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); // Test add another reservation queue and use setEntitlement to modify // capacity - ReservationQueue a2 = - new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a2 = + new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); @@ -113,8 +113,8 @@ public void testAddQueueFailCases() throws Exception { try { // Test invalid addition (adding non-zero size queue) - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); cs.addQueue(a1); fail(); @@ -123,11 +123,11 @@ public void testAddQueueFailCases() throws Exception { } // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); //set default queue capacity to zero - ((ReservationQueue) cs + ((AutoCreatedLeafQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); @@ -135,8 +135,8 @@ public void testAddQueueFailCases() throws Exception { // Test add another reservation queue and use setEntitlement to modify // capacity - ReservationQueue a2 = - new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a2 = + new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); @@ -162,8 +162,8 @@ public void testRemoveQueue() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); // Test add one reservation dynamically and manually modify capacity - ReservationQueue a1 = - new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + AutoCreatedLeafQueue a1 = + new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); @@ -230,8 +230,8 @@ public void testMoveAppToPlanQueue() throws Exception { // create the default reservation queue String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX; - ReservationQueue defQ = - new ReservationQueue(scheduler, defQName, + AutoCreatedLeafQueue defQ = + new AutoCreatedLeafQueue(scheduler, defQName, (PlanQueue) scheduler.getQueue("a")); scheduler.addQueue(defQ); defQ.setEntitlement(new QueueEntitlement(1f, 1f));