diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 2b806ddde89..ca7499df126 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -25,7 +25,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ApplicationTags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -105,8 +105,6 @@ private boolean nodeLabelsEnabled; private Set exclusiveEnforcedPartitions; - private static final String USER_ID_PREFIX = "userid="; - public RMAppManager(RMContext context, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationACLsManager applicationACLsManager, Configuration conf) { @@ -940,8 +938,15 @@ protected String getUserNameForPlacement(final String user, } LOG.debug("Application tag based placement is enabled, checking for " + "'userid' among the application tags"); + Set applicationTags = context.getApplicationTags(); - String userNameFromAppTag = getUserNameFromApplicationTag(applicationTags); + String userNameFromAppTag = null; + try { + userNameFromAppTag = ApplicationTags.getUserTag(applicationTags); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage()); + } + if (userNameFromAppTag != null) { LOG.debug("Found 'userid' '{}' in application tag", userNameFromAppTag); UserGroupInformation callerUGI = UserGroupInformation @@ -984,20 +989,6 @@ private boolean isWhitelistedUser(final String user, return false; } - private String getUserNameFromApplicationTag(Set applicationTags) { - for (String tag: applicationTags) { - if (tag.startsWith(USER_ID_PREFIX)) { - String[] userIdTag = tag.split("="); - if (userIdTag.length == 2) { - return userIdTag[1]; - } else { - LOG.warn("Found wrongly qualified username in tag"); - } - } - } - return null; - } - private void copyPlacementQueueToSubmissionContext( ApplicationPlacementContext placementContext, ApplicationSubmissionContext context) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index 6dce9c76c91..474fecea8b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ApplicationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; @@ -61,6 +63,8 @@ private Groups groups; private CapacitySchedulerQueueManager queueManager; + private Set groupTagWhitelist = null; + public UserGroupMappingPlacementRule(){ this(false, null, null); } @@ -92,7 +96,8 @@ private String getSecondaryGroup(String user) throws IOException { return secondaryGroup; } - private ApplicationPlacementContext getPlacementForUser(String user) + private ApplicationPlacementContext getPlacementForUser( + ApplicationSubmissionContext asc, String user) throws IOException { for (QueueMapping mapping : mappings) { if (mapping.getType() == MappingType.USER) { @@ -167,8 +172,9 @@ private ApplicationPlacementContext getPlacementForUser(String user) } } if (mapping.getType() == MappingType.GROUP) { + String source = getSourceByApplicationTag(asc, mapping.getSource()); for (String userGroups : groups.getGroups(user)) { - if (userGroups.equals(mapping.getSource())) { + if (userGroups.equals(source)) { if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { return getPlacementContext(mapping, user); } @@ -180,6 +186,28 @@ private ApplicationPlacementContext getPlacementForUser(String user) return null; } + private String getSourceByApplicationTag( + ApplicationSubmissionContext asc, String fallbackGroup) { + String groupTag = null; + + try { + groupTag = ApplicationTags.getGroupTag(asc.getApplicationTags()); + if (groupTag != null && !groupTagWhitelist.contains(groupTag)) { + LOG.warn("Group '{}' is not a whitelisted tag. Check the " + + "CapacityScheduler configuration!", groupTag); + groupTag = fallbackGroup; + } + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage()); + } + + if (groupTag == null) { + groupTag = fallbackGroup; + } + + return groupTag; + } + @Override public ApplicationPlacementContext getPlacementForApp( ApplicationSubmissionContext asc, String user) @@ -188,7 +216,8 @@ public ApplicationPlacementContext getPlacementForApp( ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - ApplicationPlacementContext mappedQueue = getPlacementForUser(user); + ApplicationPlacementContext mappedQueue = + getPlacementForUser(asc, user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -247,6 +276,7 @@ public boolean initialize(ResourceScheduler scheduler) List newMappings = new ArrayList<>(); queueManager = schedulerContext.getCapacitySchedulerQueueManager(); + groupTagWhitelist = conf.getGroupApplicationTagWhiteList(); // check if mappings refer to valid queues for (QueueMapping mapping : queueMappings) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationTags.java new file mode 100644 index 00000000000..283f5934dad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationTags.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Set; + +/** + * Contains application tags specific helper methods and constants. + */ +public class ApplicationTags { + + public static String USER_PREFIX = "userid"; + public static String GROUP_PREFIX = "groupid"; + + private ApplicationTags() {} + + /** + * Extracts a tag value from a collection of application tags. + * The tags have a specific format, eg. userid=value for userid + * tag. + * + * @param applicationTags Collection of formatted application tags + * @param tagPrefix Formatted tag prefix + * @return Extracted tag value without prefix + * @throws IllegalArgumentException if tag value is not properly + * formatted + */ + public static String getValueFromApplicationTag( + Set applicationTags, String tagPrefix) + throws IllegalArgumentException { + for (String tag: applicationTags) { + if (tag.startsWith(tagPrefix + "=")) { + String[] foundTag = tag.split("="); + if (foundTag.length == 2) { + return foundTag[1]; + } else { + throw new IllegalArgumentException( + "Found wrongly qualified value of tag prefix " + tagPrefix); + } + } + } + return null; + } + + public static String getGroupTag(Set applicationTags) + throws IllegalArgumentException { + return getValueFromApplicationTag(applicationTags, GROUP_PREFIX); + } + + public static String getUserTag(Set applicationTags) + throws IllegalArgumentException { + return getValueFromApplicationTag(applicationTags, USER_PREFIX); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d57e3919cdd..aadf767d6fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -279,6 +280,12 @@ @Private public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; + public static final String APPLICATION_TAG_BASED_MAPPING_PREFIX = + QUEUE_MAPPING + "-application-tag"; + + public static final String APPLICATION_TAG_BASED_MAPPING_GROUP_WHITELIST = + APPLICATION_TAG_BASED_MAPPING_PREFIX + ".group.whitelist"; + @Private public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; @@ -1089,6 +1096,28 @@ public boolean getOverrideWithWorkflowPriorityMappings() { return getTrimmedStringCollection(WORKFLOW_PRIORITY_MAPPINGS); } + public Set getGroupApplicationTagWhiteList() { + String[] whitelist = + getStrings(APPLICATION_TAG_BASED_MAPPING_GROUP_WHITELIST); + + if (whitelist == null) { + return Collections.emptySet(); + } + + return new HashSet<>(Arrays.asList(whitelist)); + } + + @Private + @VisibleForTesting + public void setApplicationTagBasedMappingGroupWhitelist( + Collection applicationTags) { + if (applicationTags == null) { + return; + } + String str = StringUtils.join(",", applicationTags); + setStrings(APPLICATION_TAG_BASED_MAPPING_GROUP_WHITELIST, str); + } + /** * Get user/group mappings to queues. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java index c2fea87c621..5f8793b12b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java @@ -31,13 +31,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ApplicationTags; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration; @@ -214,6 +218,84 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception { } } + @Test + public void testGroupApplicationTagBasedQueueMapping() + throws Exception { + String groupTag = "user1group"; + HashSet applicationTags = new HashSet<>(); + applicationTags.add(ApplicationTags.GROUP_PREFIX + "=" + groupTag); + Set whiteList = new HashSet<>(); + whiteList.add(groupTag); + + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + + List queuePlacementRules = new ArrayList<>(); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); + conf.setQueuePlacementRules(queuePlacementRules); + + List existingMappingsForUG = conf.getQueueMappings(); + + // set queue mapping + List queueMappingsForUG = new ArrayList<>(); + + // g:NOT_VALID_GROUP:%user + QueueMapping userQueueMapping1 = QueueMappingBuilder.create() + .type(QueueMapping.MappingType.GROUP) + .source("NOT_VALID_GROUP") + .queue("%user") + .build(); + + queueMappingsForUG.add(userQueueMapping1); + + existingMappingsForUG.addAll(queueMappingsForUG); + conf.setQueueMappings(existingMappingsForUG); + + conf.setApplicationTagBasedMappingGroupWhitelist(whiteList); + // override with queue mappings + conf.setOverrideWithQueueMappings(true); + + MockRM mockRM = null; + try { + mockRM = new MockRM(conf); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + cs.start(); + + ApplicationSubmissionContext asc = + Records.newRecord(ApplicationSubmissionContext.class); + asc.setQueue("default"); + asc.setApplicationTags(applicationTags); + + List rules = + cs.getRMContext().getQueuePlacementManager().getPlacementRules(); + + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); + + ApplicationPlacementContext ctx = + r.getPlacementForApp(asc, "user1"); + assertEquals("Replace non-existing group with application tag", + "user1", ctx.getQueue()); + + asc.setApplicationTags(Collections.emptySet()); + ApplicationPlacementContext ctx2 = + r.getPlacementForApp(asc, "user1"); + assertNull("Use the initial source on empty application tag", + ctx2); + + } finally { + if(mockRM != null) { + mockRM.close(); + } + } + + } @Test public void testNestedUserQueueWithPrimaryGroupAsDynamicParentQueue() throws Exception {