diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 49d2ff05801..05de3a30c78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1859,6 +1859,12 @@ public static boolean isAclEnabled(Configuration conf) { "container-monitor.procfs-tree.smaps-based-rss.enabled"; public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = false; + public static final String APPLICATION_TAG_BASED_PLACEMENT_ENABLED = + "application-tag.based-placement.enable"; + public static final boolean DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED = + false; + public static final String APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST = + "application-tag.based-placement.username.whitelist"; /** Enable/disable container metrics. */ @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index b30224e78fb..440d731b4ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -21,11 +21,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -247,7 +247,7 @@ private void initApplicationTags() { return; } ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - this.applicationTags = new HashSet(); + this.applicationTags = new TreeSet<>(); this.applicationTags.addAll(p.getApplicationTagsList()); } @@ -305,7 +305,7 @@ public synchronized void setApplicationTags(Set tags) { } checkTags(tags); // Convert applicationTags to lower case and add - this.applicationTags = new HashSet(); + this.applicationTags = new TreeSet<>(); for (String tag : tags) { this.applicationTags.add(StringUtils.toLowerCase(tag)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 62f237f99aa..c9ca28ac365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1288,6 +1288,26 @@ false + + + Whether to enable application placement based on userId passed via + application tags. When it is enabled, u=userId pattern will be searched + and if found the application will be placed in the found user's queue, + if the original user has proper rights on the passed user's queue. + + application-tag.based-placement.enable + false + + + + + Comma separated list of users who can use the application tag based + placement, if it is enabled. + + application-tag.based-placement.username.whitelist + + + How long to keep aggregation logs before deleting them. -1 disables. Be careful set this too small and you will spam the name node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f4f97936c55..45bd1a52226 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -889,7 +889,10 @@ ApplicationPlacementContext placeApplication( ApplicationPlacementContext placementContext = null; if (placementManager != null) { try { - placementContext = placementManager.placeApplication(context, user); + String usernameUsedForPlacement = + getUserNameForPlacement(user, context, placementManager); + placementContext = placementManager + .placeApplication(context, usernameUsedForPlacement); } catch (YarnException e) { // Placement could also fail if the user doesn't exist in system // skip if the user is not found during recovery. @@ -916,6 +919,78 @@ ApplicationPlacementContext placeApplication( return placementContext; } + @VisibleForTesting + protected String getUserNameForPlacement(final String user, + final ApplicationSubmissionContext context, + final PlacementManager placementManager) throws YarnException { + + boolean applicationTagBasedPlacementEnabled = conf + .getBoolean(YarnConfiguration.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, + YarnConfiguration.DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED); + + String usernameUsedForPlacement = user; + if (!applicationTagBasedPlacementEnabled) { + return usernameUsedForPlacement; + } + if (!isWhitelistedUser(user, conf)) { + LOG.warn("[{}] user is not allowed to do placement based " + + "on application tag"); + return usernameUsedForPlacement; + } + LOG.debug("Application tag based placement is enabled, checking for " + + "userId in the application tag"); + Set applicationTags = context.getApplicationTags(); + String userNameFromAppTag = getUserNameFromApplicationTag(applicationTags); + if (userNameFromAppTag != null) { + LOG.debug("Found [{}] userId in application tag", userNameFromAppTag); + UserGroupInformation callerUGI; + callerUGI = UserGroupInformation.createRemoteUser(userNameFromAppTag); + // check if the actual user has rights to submit application to the + // user's queue from the application tag + String queue = placementManager + .placeApplication(context, usernameUsedForPlacement).getQueue(); + if (callerUGI != null && scheduler + .checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) { + usernameUsedForPlacement = userNameFromAppTag; + } else { + LOG.warn("User [{}] from application tag does not have access to " + + "[{}] queue. " + "The placement is done for [{}] user", + userNameFromAppTag, queue, user); + } + } else { + LOG.warn("There is no userId passed. The placement is done fot [{}] user", + user); + } + return usernameUsedForPlacement; + } + + private boolean isWhitelistedUser(final String user, + final Configuration conf) { + String[] userWhitelist = conf.getStrings(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST); + if (userWhitelist == null || userWhitelist.length == 0) { + return false; + } + for (String s: userWhitelist) { + if (s.equals(user)) { + return true; + } + } + return false; + } + + private String getUserNameFromApplicationTag(Set applicationTags) { + String userIdTag = null; + String userIdPrefix = "u="; + for (String tag: applicationTags) { + if (tag.startsWith(userIdPrefix)) { + userIdTag = tag.split("=")[1]; + break; + } + } + return userIdTag; + } + private void copyPlacementQueueToSubmissionContext( ApplicationPlacementContext placementContext, ApplicationSubmissionContext context) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java index 36258b431f9..bbe5b6aef92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -103,5 +104,11 @@ public void submitApplication( super.submitApplication(submissionContext, System.currentTimeMillis(), user); } + + public String getUserNameForPlacement (final String user, + final ApplicationSubmissionContext context, + final PlacementManager placementManager) throws YarnException { + return super.getUserNameForPlacement(user, context, placementManager); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 906f1162b3c..2a254d7c566 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -94,12 +95,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.doAnswer; @@ -1197,4 +1200,128 @@ private static Resource mockResource() { return cloneReqs; } + @Test + public void testGetUserNameForPlacementTagBasedPlacementDisabled() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + checkUsername(expectedQueue, user, user); + } + + @Test + public void testGetUserNameForPlacementTagBasedPlacementEnabled() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + String expectedUser = "user2"; + String userIdTag = "u=" + expectedUser; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(true, user); + checkUsername(expectedQueue, user, expectedUser); + } + + @Test + public void testGetUserNameForPlacementTagBasedPlacementMultipleUserIds() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + String expectedUser = "user2"; + String userIdTag = "u=" + expectedUser; + String userIdTag2 = "u=user3"; + setApplicationTags("tag1", userIdTag, "tag2", userIdTag2); + enableApplicationTagPlacement(true, user); + checkUsername(expectedQueue, user, expectedUser); + } + + @Test + public void testGetUserNameForPlacementTagBasedPlacementNoUserId() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + setApplicationTags("tag1", "tag2"); + enableApplicationTagPlacement(true, user); + checkUsername(expectedQueue, user, user); + } + + @Test + public void testGetUserNameForPlacementTagBasedPlacementWrongUserId() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(false, user); + checkUsername(expectedQueue, user, user); + } + + @Test + public void testGetUserNameForPlacementNotWhitelistedUser() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(false, "someUser"); + checkUsername(expectedQueue, user, user); + } + + @Test + public void testGetUserNameForPlacementEmptyWhiteList() + throws YarnException { + + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(false); + checkUsername(expectedQueue, user, user); + } + + private void enableApplicationTagPlacement(boolean userHasAccessToQueue, + String ... whiteListedUsers) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true); + conf.setStrings(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + ResourceScheduler scheduler = mockResourceScheduler(); + when(scheduler.checkAccess(any(UserGroupInformation.class), + eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class))) + .thenReturn(userHasAccessToQueue); + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + appMonitor = new TestRMAppManager(rmContext, + new ClientToAMTokenSecretManagerInRM(), + scheduler, masterService, + new ApplicationACLsManager(conf), conf); + } + + private void checkUsername(final String queue, final String submittingUser, + final String expectedUser) + throws YarnException { + PlacementManager placementMgr = mock(PlacementManager.class); + ApplicationPlacementContext appContext + = new ApplicationPlacementContext(queue); + when(placementMgr.placeApplication(asContext, submittingUser)) + .thenReturn(appContext); + String userNameForPlacement = appMonitor + .getUserNameForPlacement(submittingUser,asContext,placementMgr); + Assert.assertEquals(expectedUser, userNameForPlacement); + } + + private void setApplicationTags (String... tags) { + Set applicationTags = new TreeSet<>(); + for (String tag: tags) { + applicationTags.add(tag); + } + asContext.setApplicationTags(applicationTags); + } }