diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index 391fb34c5d6..36a323dbdef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -60,6 +61,7 @@ private List mappings = null; private Groups groups; private CapacitySchedulerQueueManager queueManager; + private UserGroupPlacementContextFactory placementFactory = null; public UserGroupMappingPlacementRule(){ this(false, null, null); @@ -73,156 +75,6 @@ public UserGroupMappingPlacementRule(){ this.groups = groups; } - private String getPrimaryGroup(String user) throws IOException { - return groups.getGroups(user).get(0); - } - - private String getSecondaryGroup(String user) throws IOException { - List groupsList = groups.getGroups(user); - String secondaryGroup = null; - // Traverse all secondary groups (as there could be more than one - // and position is not guaranteed) and ensure there is queue with - // the same name - for (int i = 1; i < groupsList.size(); i++) { - if (this.queueManager.getQueue(groupsList.get(i)) != null) { - secondaryGroup = groupsList.get(i); - break; - } - } - - if (secondaryGroup == null && LOG.isDebugEnabled()) { - LOG.debug("User {} is not associated with any Secondary " - + "Group. Hence it may use the 'default' queue", user); - } - return secondaryGroup; - } - - private ApplicationPlacementContext getPlacementForUser(String user) - throws IOException { - for (QueueMapping mapping : mappings) { - if (mapping.getType().equals(MappingType.USER)) { - if (mapping.getSource().equals(CURRENT_USER_MAPPING)) { - if (mapping.getParentQueue() != null - && mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) - && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getContextForGroupParent(user, mapping, - getPrimaryGroup(user)); - } else if (mapping.getParentQueue() != null - && mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING) - && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getContextForGroupParent(user, mapping, - getSecondaryGroup(user)); - } else if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getPlacementContext(mapping, user); - } else if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { - return getContextForPrimaryGroup(user, mapping); - } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { - return getContextForSecondaryGroup(user, mapping); - } else { - return getPlacementContext(mapping); - } - } - - if (user.equals(mapping.getSource())) { - if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { - return getPlacementContext(mapping, getPrimaryGroup(user)); - } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { - String secondaryGroup = getSecondaryGroup(user); - if (secondaryGroup != null) { - return getPlacementContext(mapping, secondaryGroup); - } else { - return null; - } - } else { - return getPlacementContext(mapping); - } - } - } - if (mapping.getType().equals(MappingType.GROUP)) { - for (String userGroups : groups.getGroups(user)) { - if (userGroups.equals(mapping.getSource())) { - if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { - return getPlacementContext(mapping, user); - } - return getPlacementContext(mapping); - } - } - } - } - return null; - } - - // invoked for mappings: - // u:%user:[parent].%primary_group - // u:%user:%primary_group - private ApplicationPlacementContext getContextForPrimaryGroup( - String user, - QueueMapping mapping) throws IOException { - String group = - CapacitySchedulerConfiguration.ROOT + "." + getPrimaryGroup(user); - - String parent = mapping.getParentQueue(); - CSQueue groupQueue = queueManager.getQueue(group); - - if (parent != null) { - CSQueue parentQueue = queueManager.getQueue(parent); - - if (parentQueue instanceof ManagedParentQueue) { - return getPlacementContext(mapping, group); - } else { - return groupQueue == null ? null : getPlacementContext(mapping, group); - } - } else { - return groupQueue == null ? null : getPlacementContext(mapping, group); - } - } - - // invoked for mappings - // u:%user:%secondary_group - // u:%user:[parent].%secondary_group - private ApplicationPlacementContext getContextForSecondaryGroup( - String user, - QueueMapping mapping) throws IOException { - String secondaryGroup = getSecondaryGroup(user); - - if (secondaryGroup != null) { - CSQueue queue = this.queueManager.getQueue(secondaryGroup); - if ( queue != null) { - return getPlacementContext(mapping, queue.getQueuePath()); - } else { - return null; - } - } else { - return null; - } - } - - // invoked for mappings: - // u:%user:%primary_group.%user - // u:%user:%secondary_group.%user - private ApplicationPlacementContext getContextForGroupParent( - String user, - QueueMapping mapping, - String group) throws IOException { - - if (this.queueManager.getQueue(group) != null) { - // replace the group string - QueueMapping resolvedGroupMapping = - QueueMappingBuilder.create() - .type(mapping.getType()) - .source(mapping.getSource()) - .queue(user) - .parentQueue( - CapacitySchedulerConfiguration.ROOT + "." + - group) - .build(); - validateQueueMapping(resolvedGroupMapping); - return getPlacementContext(resolvedGroupMapping, user); - } else { - return null; - } - } - @Override public ApplicationPlacementContext getPlacementForApp( ApplicationSubmissionContext asc, String user) @@ -231,7 +83,8 @@ public ApplicationPlacementContext getPlacementForApp( ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - ApplicationPlacementContext mappedQueue = getPlacementForUser(user); + ApplicationPlacementContext mappedQueue = + placementFactory.getPlacementForUser(mappings, user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -253,28 +106,6 @@ public ApplicationPlacementContext getPlacementForApp( return null; } - private ApplicationPlacementContext getPlacementContext( - QueueMapping mapping) throws IOException { - return getPlacementContext(mapping, mapping.getQueue()); - } - - private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, - String leafQueueName) throws IOException { - - //leafQueue name no longer identifies a queue uniquely checking ambiguity - if (!mapping.hasParentQueue() && queueManager.isAmbiguous(leafQueueName)) { - throw new IOException("mapping contains ambiguous leaf queue reference " + - leafQueueName); - } - - if (!StringUtils.isEmpty(mapping.getParentQueue())) { - return new ApplicationPlacementContext(leafQueueName, - mapping.getParentQueue()); - } else{ - return new ApplicationPlacementContext(leafQueueName); - } - } - @VisibleForTesting @Override public boolean initialize(ResourceScheduler scheduler) @@ -363,6 +194,8 @@ public boolean initialize(ResourceScheduler scheduler) this.groups = Groups.getUserToGroupsMappingService( ((CapacityScheduler)scheduler).getConf()); this.overrideWithQueueMappings = overrideWithQueueMappings; + this.placementFactory = new UserGroupPlacementContextFactory( + groups, queueManager); return true; } return false; @@ -435,37 +268,6 @@ private static boolean isStaticQueueMapping(QueueMapping mapping) { .contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING); } - private void validateQueueMapping(QueueMapping queueMapping) - throws IOException { - String parentQueueName = queueMapping.getParentQueue(); - String leafQueueFullName = queueMapping.getFullPath(); - CSQueue parentQueue = queueManager.getQueueByFullName(parentQueueName); - CSQueue leafQueue = queueManager.getQueue(leafQueueFullName); - - if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) { - //this might be confusing, but a mapping is not guaranteed to provide the - //parent queue's name, which can result in ambiguous queue references - //if no parent queueName is provided mapping.getFullPath() is the same - //as mapping.getQueue() - if (leafQueue == null && queueManager.isAmbiguous(leafQueueFullName)) { - throw new IOException("mapping contains ambiguous leaf queue name: " - + leafQueueFullName); - } else { - throw new IOException("mapping contains invalid or non-leaf queue : " - + leafQueueFullName); - } - } else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) { - throw new IOException( - "mapping contains invalid parent queue [" + parentQueueName + "]"); - } else if (!parentQueue.getQueuePath() - .equals(leafQueue.getParent().getQueuePath())) { - throw new IOException("mapping contains invalid parent queue " - + "which does not match existing leaf queue's parent : [" - + parentQueue.getQueuePath() + "] does not match [ " - + leafQueue.getParent().getQueuePath() + "]"); - } - } - @VisibleForTesting public List getQueueMappings() { return mappings; @@ -476,4 +278,11 @@ private void validateQueueMapping(QueueMapping queueMapping) public void setQueueManager(CapacitySchedulerQueueManager queueManager) { this.queueManager = queueManager; } + + @VisibleForTesting + @Private + public void setPlacementFactory( + UserGroupPlacementContextFactory placementFactory) { + this.placementFactory = placementFactory; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java new file mode 100644 index 00000000000..cfd353212b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupPlacementContextFactory.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING; + +/** + * Isolates CapacityScheduler specific queue mapping resolution logic. + */ +public class UserGroupPlacementContextFactory { + + private static final Logger LOG = LoggerFactory.getLogger( + UserGroupPlacementContextFactory.class); + + private final Groups groups; + private final CapacitySchedulerQueueManager queueManager; + + public UserGroupPlacementContextFactory( + Groups groups, CapacitySchedulerQueueManager queueManager) { + this.groups = groups; + this.queueManager = queueManager; + } + + /** + * Finds the first successful mapping in a collection of {@link QueueMapping}. + * The dynamic mapping resolution is based on the given user. + * + * @param mappings mapping config + * @param user current user + * @return information about application placement if any of the mappings + * were successful; null otherwise + * @throws IOException + */ + public ApplicationPlacementContext getPlacementForUser( + Collection mappings, String user) throws IOException { + for (QueueMapping mapping : mappings) { + String sourceName = mapping.getSource(); + String queueName = mapping.getQueue(); + String parentQueue = mapping.getParentQueue(); + QueueMapping.MappingType mappingType = mapping.getType(); + + if (mappingType == QueueMapping.MappingType.USER) { + if (isCurrentUserParentMapping(mapping)) { + switch (parentQueue) { + case PRIMARY_GROUP_MAPPING: + return getUserToParentGroupPlacement( + user, mapping, getPrimaryGroup(user)); + case SECONDARY_GROUP_MAPPING: + return getUserToParentGroupPlacement( + user, mapping, getSecondaryGroup(user)); + } + } + if (sourceName.equals(CURRENT_USER_MAPPING)) { + switch (queueName) { + case CURRENT_USER_MAPPING: + return getPlacementContext(mapping, user); + case PRIMARY_GROUP_MAPPING: + return getPrimaryGroupContext(mapping, user); + case SECONDARY_GROUP_MAPPING: + return getContextByCheckedQueue(mapping, getSecondaryGroup(user)); + default: + return getPlacementContext(mapping); + } + } + if (sourceName.equals(user)) { + switch (queueName) { + case PRIMARY_GROUP_MAPPING: + return getPlacementContext(mapping, getPrimaryGroup(user)); + case SECONDARY_GROUP_MAPPING: + return getContextByCheckedQueueName( + mapping, getSecondaryGroup(user)); + default: + return getPlacementContext(mapping); + } + } + } + if (mappingType == QueueMapping.MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(sourceName)) { + if (queueName.equals(CURRENT_USER_MAPPING)) { + return getPlacementContext(mapping, user); + } + return getPlacementContext(mapping); + } + } + } + } + return null; + } + + /** + * Creates an {@link ApplicationPlacementContext} with a checked parent queue. + * + * @param user current user + * @param mapping mapping config + * @return the context, if a queue already exists with the name of the group; + * null otherwise + * @throws IOException + */ + private ApplicationPlacementContext getUserToParentGroupPlacement( + String user, QueueMapping mapping, String group) throws IOException { + if (queueManager.getQueue(group) != null) { + QueueMapping queueMapping = + QueueMapping.QueueMappingBuilder.create() + .type(mapping.getType()) + .source(mapping.getSource()) + .queue(user) + .parentQueue(CapacitySchedulerConfiguration.ROOT + "." + + group) + .build(); + validateQueueMapping(queueMapping); + return getPlacementContext(queueMapping, user); + } + + return null; + } + + /** + * Checks if the mapping is in the form of %user:parent.%user + * + * @param queueMapping mapping config + * @return true if mapping source and queue value is the + * current user mapping, and the parentQueue is present; + * false otherwise + */ + private boolean isCurrentUserParentMapping(QueueMapping queueMapping) { + return queueMapping.getSource().equals(CURRENT_USER_MAPPING) + && queueMapping.getParentQueue() != null + && queueMapping.getQueue().equals(CURRENT_USER_MAPPING); + } + + private void validateQueueMapping(QueueMapping queueMapping) + throws IOException { + String parentQueueName = queueMapping.getParentQueue(); + String leafQueueFullName = queueMapping.getFullPath(); + CSQueue parentQueue = queueManager.getQueueByFullName(parentQueueName); + CSQueue leafQueue = queueManager.getQueue(leafQueueFullName); + + if ((!(leafQueue instanceof LeafQueue))) { + //this might be confusing, but a mapping is not guaranteed to provide the + //parent queue's name, which can result in ambiguous queue references + //if no parent queueName is provided mapping.getFullPath() is the same + //as mapping.getQueue() + if (leafQueue == null && queueManager.isAmbiguous(leafQueueFullName)) { + throw new IOException("mapping contains ambiguous leaf queue name: " + + leafQueueFullName); + } else { + throw new IOException("mapping contains invalid or non-leaf queue : " + + leafQueueFullName); + } + } else if ((!(parentQueue instanceof ParentQueue))) { + throw new IOException( + "mapping contains invalid parent queue [" + parentQueueName + "]"); + } else if (!parentQueue.getQueuePath() + .equals(leafQueue.getParent().getQueuePath())) { + throw new IOException("mapping contains invalid parent queue " + + "which does not match existing leaf queue's parent : [" + + parentQueue.getQueuePath() + "] does not match [ " + + leafQueue.getParent().getQueuePath() + "]"); + } + } + + private String getPrimaryGroup(String user) throws IOException { + return groups.getGroups(user).get(0); + } + + private String getSecondaryGroup(String user) throws IOException { + List groupsList = groups.getGroups(user); + String secondaryGroup = null; + // Traverse all secondary groups (as there could be more than one + // and position is not guaranteed) and ensure there is queue with + // the same name + for (int i = 1; i < groupsList.size(); i++) { + if (queueManager.getQueue(groupsList.get(i)) != null) { + secondaryGroup = groupsList.get(i); + break; + } + } + + if (secondaryGroup == null && LOG.isDebugEnabled()) { + LOG.debug("User {} is not associated with any Secondary " + + "Group. Hence it may use the 'default' queue", user); + } + return secondaryGroup; + } + + private ApplicationPlacementContext getPrimaryGroupContext( + QueueMapping queueMapping, String user) throws IOException { + String group = CapacitySchedulerConfiguration.ROOT + "." + + getPrimaryGroup(user); + + String parent = queueMapping.getParentQueue(); + CSQueue groupQueue = queueManager.getQueue(group); + + if (parent != null) { + CSQueue parentQueue = queueManager.getQueue(parent); + + if (parentQueue instanceof ManagedParentQueue) { + return getPlacementContext(queueMapping, group); + } else { + return groupQueue == null ? null : getPlacementContext( + queueMapping, group); + } + } else { + return groupQueue == null ? null : getPlacementContext( + queueMapping, group); + } + } + + private ApplicationPlacementContext getContextByCheckedQueueName( + QueueMapping queueMapping, String queue) { + if (queue != null) { + return getPlacementContext(queueMapping, queue); + } else { + return null; + } + } + + private ApplicationPlacementContext getContextByCheckedQueue( + QueueMapping queueMapping, String queue) { + if (queue == null) { + return null; + } + + CSQueue csQueue = queueManager.getQueue(queue); + if (csQueue != null) { + return getPlacementContext(queueMapping, csQueue.getQueuePath()); + } else { + return null; + } + } + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping) { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping, String leafQueueName) { + if (!StringUtils.isEmpty(mapping.getParentQueue())) { + return new ApplicationPlacementContext(leafQueueName, + mapping.getParentQueue()); + } else { + return new ApplicationPlacementContext(leafQueueName); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java index 3b85fdad08d..6558bdb16ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java @@ -82,6 +82,9 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception { UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule( false, Arrays.asList(userQueueMapping), null); + + ugRule.setPlacementFactory(new UserGroupPlacementContextFactory( + null, null)); queuePlacementRules.add(ugRule); pm.updateRules(queuePlacementRules); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index 1d7b6b7ff79..0f5420b1676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -113,6 +113,10 @@ private void verifyQueueMapping(QueueMappingTestData queueMappingTestData) .thenReturn(managedParent); + UserGroupPlacementContextFactory placementFactory = + new UserGroupPlacementContextFactory(groups, queueManager); + rule.setQueueManager(queueManager); + rule.setPlacementFactory(placementFactory); rule.setQueueManager(queueManager); ApplicationSubmissionContext asc = Records.newRecord( ApplicationSubmissionContext.class);