diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index 6dce9c76c91..c2969479c18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.Groups; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -30,7 +29,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -40,7 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +57,7 @@ private List mappings = null; private Groups groups; private CapacitySchedulerQueueManager queueManager; + private UserGroupPlacementContextFactory placementFactory = null; public UserGroupMappingPlacementRule(){ this(false, null, null); @@ -72,114 +70,6 @@ public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, this.groups = groups; } - private String getSecondaryGroup(String user) throws IOException { - List groupsList = groups.getGroups(user); - String secondaryGroup = null; - // Traverse all secondary groups (as there could be more than one - // and position is not guaranteed) and ensure there is queue with - // the same name - for (int i = 1; i < groupsList.size(); i++) { - if (this.queueManager.getQueue(groupsList.get(i)) != null) { - secondaryGroup = groupsList.get(i); - break; - } - } - - if (secondaryGroup == null && LOG.isDebugEnabled()) { - LOG.debug("User {} is not associated with any Secondary " - + "Group. Hence it may use the 'default' queue", user); - } - return secondaryGroup; - } - - private ApplicationPlacementContext getPlacementForUser(String user) - throws IOException { - for (QueueMapping mapping : mappings) { - if (mapping.getType() == MappingType.USER) { - if (mapping.getSource().equals(CURRENT_USER_MAPPING)) { - if (mapping.getParentQueue() != null - && mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) - && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - if (this.queueManager - .getQueue(groups.getGroups(user).get(0)) != null) { - QueueMapping queueMapping = - QueueMappingBuilder.create() - .type(mapping.getType()) - .source(mapping.getSource()).queue(user) - .parentQueue(groups.getGroups(user).get(0)) - .build(); - validateQueueMapping(queueMapping); - return getPlacementContext(queueMapping, user); - } else { - return null; - } - } else if (mapping.getParentQueue() != null - && mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING) - && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - String secondaryGroup = getSecondaryGroup(user); - if (secondaryGroup != null) { - QueueMapping queueMapping = - QueueMappingBuilder.create() - .type(mapping.getType()) - .source(mapping.getSource()) - .queue(user) - .parentQueue(secondaryGroup) - .build(); - validateQueueMapping(queueMapping); - return getPlacementContext(queueMapping, user); - } else { - return null; - } - } else if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getPlacementContext(mapping, user); - } else if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { - if (this.queueManager - .getQueue(groups.getGroups(user).get(0)) != null) { - return getPlacementContext(mapping, - groups.getGroups(user).get(0)); - } else { - return null; - } - } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { - String secondaryGroup = getSecondaryGroup(user); - if (secondaryGroup != null) { - return getPlacementContext(mapping, secondaryGroup); - } else { - return null; - } - } else { - return getPlacementContext(mapping); - } - } - if (user.equals(mapping.getSource())) { - if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { - return getPlacementContext(mapping, groups.getGroups(user).get(0)); - } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { - String secondaryGroup = getSecondaryGroup(user); - if (secondaryGroup != null) { - return getPlacementContext(mapping, secondaryGroup); - } else { - return null; - } - } else { - return getPlacementContext(mapping); - } - } - } - if (mapping.getType() == MappingType.GROUP) { - for (String userGroups : groups.getGroups(user)) { - if (userGroups.equals(mapping.getSource())) { - if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getPlacementContext(mapping, user); - } - return getPlacementContext(mapping); - } - } - } - } - return null; - } - @Override public ApplicationPlacementContext getPlacementForApp( ApplicationSubmissionContext asc, String user) @@ -188,7 +78,8 @@ public ApplicationPlacementContext getPlacementForApp( ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - ApplicationPlacementContext mappedQueue = getPlacementForUser(user); + ApplicationPlacementContext mappedQueue = + placementFactory.getPlacementForUser(mappings, user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -210,20 +101,6 @@ public ApplicationPlacementContext getPlacementForApp( return null; } - private ApplicationPlacementContext getPlacementContext( - QueueMapping mapping) { - return getPlacementContext(mapping, mapping.getQueue()); - } - - private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, - String leafQueueName) { - if (!StringUtils.isEmpty(mapping.getParentQueue())) { - return new ApplicationPlacementContext(leafQueueName, - mapping.getParentQueue()); - } else{ - return new ApplicationPlacementContext(leafQueueName); - } - } @VisibleForTesting @Override @@ -313,6 +190,8 @@ public boolean initialize(ResourceScheduler scheduler) this.groups = Groups.getUserToGroupsMappingService( ((CapacityScheduler)scheduler).getConf()); this.overrideWithQueueMappings = overrideWithQueueMappings; + this.placementFactory = new UserGroupPlacementContextFactory( + groups, queueManager); return true; } return false; @@ -383,28 +262,6 @@ private static boolean isStaticQueueMapping(QueueMapping mapping) { .contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING); } - private void validateQueueMapping(QueueMapping queueMapping) - throws IOException { - String parentQueueName = queueMapping.getParentQueue(); - String leafQueueName = queueMapping.getQueue(); - CSQueue parentQueue = queueManager.getQueue(parentQueueName); - CSQueue leafQueue = queueManager.getQueue(leafQueueName); - - if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) { - throw new IOException("mapping contains invalid or non-leaf queue : " - + leafQueueName); - } else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) { - throw new IOException( - "mapping contains invalid parent queue [" + parentQueueName + "]"); - } else if (!parentQueue.getQueueName() - .equals(leafQueue.getParent().getQueueName())) { - throw new IOException("mapping contains invalid parent queue " - + "which does not match existing leaf queue's parent : [" - + parentQueue.getQueueName() + "] does not match [ " - + leafQueue.getParent().getQueueName() + "]"); - } - } - @VisibleForTesting public List getQueueMappings() { return mappings; @@ -415,4 +272,11 @@ private void validateQueueMapping(QueueMapping queueMapping) public void setQueueManager(CapacitySchedulerQueueManager queueManager) { this.queueManager = queueManager; } + + @VisibleForTesting + @Private + public void setPlacementFactory( + UserGroupPlacementContextFactory placementFactory) { + this.placementFactory = placementFactory; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java new file mode 100644 index 00000000000..da9ce1a85ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING; + +/** + * Isolates CapacityScheduler specific queue mapping logic. + */ +public class UserGroupPlacementContextFactory { + + private static final Logger LOG = LoggerFactory.getLogger( + UserGroupPlacementContextFactory.class); + + private final Groups groups; + private final CapacitySchedulerQueueManager queueManager; + + public UserGroupPlacementContextFactory() { + this.groups = null; + this.queueManager = null; + } + + public UserGroupPlacementContextFactory( + Groups groups, CapacitySchedulerQueueManager queueManager) { + this.groups = groups; + this.queueManager = queueManager; + } + + /** + * Finds the first successful mapping in a collection of {@link QueueMapping}. + * The dynamic mapping resolution is based on the given user. + * + * @param mappings Mapping config + * @param user Current user + * @return Information about application placement if any of the mappings + * were successful; null otherwise + * @throws IOException + */ + public ApplicationPlacementContext getPlacementForUser( + Collection mappings, String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.getType() == QueueMapping.MappingType.USER) { + if (isCurrentUserToGroupParentMapping(mapping)) { + return getUserToGroupPlacement(user, mapping); + } + if (mapping.getSource().equals(CURRENT_USER_MAPPING)) { + if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + return getPlacementContext(mapping, user); + } + } + if (user.equals(mapping.getSource()) + || mapping.getSource().equals(CURRENT_USER_MAPPING)) { + switch (mapping.getQueue()) { + case PRIMARY_GROUP_MAPPING: + return getPlacementContext(mapping, groups.getGroups(user).get(0)); + case SECONDARY_GROUP_MAPPING: + return getNullablePlacementContext( + mapping, getSecondaryGroup(user)); + default: + return getPlacementContext(mapping); + } + } + } + if (mapping.getType() == QueueMapping.MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.getSource())) { + if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + return getPlacementContext(mapping, user); + } + return getPlacementContext(mapping); + } + } + } + } + return null; + } + + private ApplicationPlacementContext getUserToGroupPlacement( + String user, QueueMapping mapping) throws IOException { + String group; + if (mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)) { + group = getPrimaryGroup(user); + } else { + group = getSecondaryGroup(user); + } + + if (group != null) { + QueueMapping queueMapping = + QueueMapping.QueueMappingBuilder.create() + .type(mapping.getType()) + .source(mapping.getSource()) + .queue(user) + .parentQueue(group) + .build(); + validateQueueMapping(queueMapping); + return getPlacementContext(queueMapping, user); + } else { + return null; + } + } + + private boolean isCurrentUserToGroupParentMapping(QueueMapping queueMapping) { + return queueMapping.getSource().equals(CURRENT_USER_MAPPING) + && queueMapping.getParentQueue() != null + && (queueMapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) + || queueMapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)) + && queueMapping.getQueue().equals(CURRENT_USER_MAPPING); + } + + private void validateQueueMapping(QueueMapping queueMapping) + throws IOException { + String parentQueueName = queueMapping.getParentQueue(); + String leafQueueName = queueMapping.getQueue(); + CSQueue parentQueue = queueManager.getQueue(parentQueueName); + CSQueue leafQueue = queueManager.getQueue(leafQueueName); + + if (!(leafQueue instanceof LeafQueue)) { + throw new IOException("mapping contains invalid or non-leaf queue : " + + leafQueueName); + } else if (!(parentQueue instanceof ParentQueue)) { + throw new IOException( + "mapping contains invalid parent queue [" + parentQueueName + "]"); + } else if (!parentQueue.getQueueName() + .equals(leafQueue.getParent().getQueueName())) { + throw new IOException("mapping contains invalid parent queue " + + "which does not match existing leaf queue's parent : [" + + parentQueue.getQueueName() + "] does not match [ " + + leafQueue.getParent().getQueueName() + "]"); + } + } + + private String getPrimaryGroup(String user) throws IOException { + String group = groups.getGroups(user).get(0); + CSQueue csQueue = this.queueManager.getQueue(group); + if (csQueue != null) { + return group; + } + + return null; + } + + private String getSecondaryGroup(String user) throws IOException { + List groupsList = groups.getGroups(user); + String secondaryGroup = null; + // Traverse all secondary groups (as there could be more than one + // and position is not guaranteed) and ensure there is queue with + // the same name + for (int i = 1; i < groupsList.size(); i++) { + if (this.queueManager.getQueue(groupsList.get(i)) != null) { + secondaryGroup = groupsList.get(i); + break; + } + } + + if (secondaryGroup == null && LOG.isDebugEnabled()) { + LOG.debug("User {} is not associated with any Secondary " + + "Group. Hence it may use the 'default' queue", user); + } + return secondaryGroup; + } + + private ApplicationPlacementContext getNullablePlacementContext( + QueueMapping queueMapping, String queue) { + if (queue != null) { + return getPlacementContext(queueMapping, queue); + } else { + return null; + } + } + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping) { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping, String leafQueueName) { + if (!StringUtils.isEmpty(mapping.getParentQueue())) { + return new ApplicationPlacementContext(leafQueueName, + mapping.getParentQueue()); + } else { + return new ApplicationPlacementContext(leafQueueName); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java index 3b85fdad08d..e0b6230546c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java @@ -82,6 +82,8 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception { UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule( false, Arrays.asList(userQueueMapping), null); + + ugRule.setPlacementFactory(new UserGroupPlacementContextFactory()); queuePlacementRules.add(ugRule); pm.updateRules(queuePlacementRules); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index e663a14c462..86654a26aa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -89,7 +89,10 @@ private void verifyQueueMapping(QueueMappingTestData queueMappingTestData) when(queueManager.getQueue("bsubgroup2")).thenReturn(bsubgroup2); when(queueManager.getQueue("asubgroup2")).thenReturn(asubgroup2); + UserGroupPlacementContextFactory placementFactory = + new UserGroupPlacementContextFactory(groups, queueManager); rule.setQueueManager(queueManager); + rule.setPlacementFactory(placementFactory); ApplicationSubmissionContext asc = Records.newRecord( ApplicationSubmissionContext.class); asc.setQueue(inputQueue);