From 9ff6837d681081a8b8548ced170352d917c21f1e Mon Sep 17 00:00:00 2001 From: Sunil Date: Mon, 26 Dec 2016 19:59:14 +0530 Subject: [PATCH] YARN-3955 --- YARN-3955.0006.patch | 1610 ++++++++++++++++++++ .../sls/scheduler/ResourceSchedulerWrapper.java | 2 +- .../apache/hadoop/yarn/security/AccessType.java | 2 + .../conf/capacity-scheduler.xml | 9 + .../server/resourcemanager/ClientRMService.java | 3 +- .../yarn/server/resourcemanager/RMAppManager.java | 16 +- .../scheduler/AbstractYarnScheduler.java | 9 +- .../resourcemanager/scheduler/YarnScheduler.java | 11 +- .../scheduler/capacity/CapacityScheduler.java | 61 +- .../capacity/CapacitySchedulerConfiguration.java | 43 +- .../capacity/CapacitySchedulerContext.java | 3 + .../capacity/CapacitySchedulerQueueManager.java | 21 +- .../scheduler/capacity/LeafQueue.java | 16 + .../capacity/PriorityACLConfiguration.java | 212 +++ .../scheduler/capacity/PriorityACLGroup.java | 98 ++ .../security/AppPriorityACLsManager.java | 216 +++ .../yarn/server/resourcemanager/ACLsTestBase.java | 5 +- .../TestApplicationMasterService.java | 5 +- .../resourcemanager/TestClientRMService.java | 2 +- .../capacity/TestApplicationPriority.java | 25 +- .../TestApplicationPriorityACLConfiguration.java | 120 ++ .../capacity/TestApplicationPriorityACLs.java | 206 +++ .../scheduler/capacity/TestParentQueue.java | 6 +- 23 files changed, 2646 insertions(+), 55 deletions(-) create mode 100644 YARN-3955.0006.patch create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java diff --git a/YARN-3955.0006.patch b/YARN-3955.0006.patch new file mode 100644 index 0000000..7bc51db --- /dev/null +++ b/YARN-3955.0006.patch @@ -0,0 +1,1610 @@ +From 3c04d985a27cb6e98f487644bc478d12c8dd4ff9 Mon Sep 17 00:00:00 2001 +From: Sunil +Date: Mon, 26 Dec 2016 19:57:10 +0530 +Subject: [PATCH] YARN-3955 + +--- + .../sls/scheduler/ResourceSchedulerWrapper.java | 2 +- + .../apache/hadoop/yarn/security/AccessType.java | 2 + + .../conf/capacity-scheduler.xml | 9 + + .../server/resourcemanager/ClientRMService.java | 3 +- + .../yarn/server/resourcemanager/RMAppManager.java | 16 +- + .../scheduler/AbstractYarnScheduler.java | 9 +- + .../resourcemanager/scheduler/YarnScheduler.java | 11 +- + .../scheduler/capacity/CapacityScheduler.java | 61 +++--- + .../capacity/CapacitySchedulerConfiguration.java | 43 +++- + .../capacity/CapacitySchedulerContext.java | 3 + + .../capacity/CapacitySchedulerQueueManager.java | 21 +- + .../scheduler/capacity/LeafQueue.java | 16 ++ + .../capacity/PriorityACLConfiguration.java | 212 ++++++++++++++++++++ + .../scheduler/capacity/PriorityACLGroup.java | 89 +++++++++ + .../security/AppPriorityACLsManager.java | 216 +++++++++++++++++++++ + .../yarn/server/resourcemanager/ACLsTestBase.java | 5 +- + .../TestApplicationMasterService.java | 5 +- + .../resourcemanager/TestClientRMService.java | 2 +- + .../capacity/TestApplicationPriority.java | 25 ++- + .../TestApplicationPriorityACLConfiguration.java | 120 ++++++++++++ + .../capacity/TestApplicationPriorityACLs.java | 206 ++++++++++++++++++++ + .../scheduler/capacity/TestParentQueue.java | 6 +- + 22 files changed, 1027 insertions(+), 55 deletions(-) + create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java + create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java + create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java + create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java + create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java + +diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +index 79f934c..e3a757c 100644 +--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ++++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +@@ -964,7 +964,7 @@ protected void completedContainerInternal(RMContainer rmContainer, + + @Override + public Priority checkAndGetApplicationPriority(Priority priority, +- String user, String queueName, ApplicationId applicationId) ++ UserGroupInformation user, String queueName, ApplicationId applicationId) + throws YarnException { + // TODO Dummy implementation. + return Priority.newInstance(0); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java +index 32459b9..f60caf2 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java +@@ -30,4 +30,6 @@ + // queue + SUBMIT_APP, + ADMINISTER_QUEUE, ++ // application ++ ACCESS_PRIORITY, + } +\ No newline at end of file +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +index 6ac726e..75f4c0a 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +@@ -98,6 +98,15 @@ + + + ++ yarn.scheduler.capacity.root.default.acl_access_priority ++ * ++ ++ The ACL of who can submit applications with configured priority. ++ For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] ++ ++ ++ ++ + yarn.scheduler.capacity.node-locality-delay + 40 + +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +index 3dc7e38..0f56a2c 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +@@ -1614,7 +1614,8 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( + } + + try { +- rmAppManager.updateApplicationPriority(applicationId, newAppPriority); ++ rmAppManager.updateApplicationPriority(callerUGI, applicationId, ++ newAppPriority); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +index 4d628ee..5069d2e 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +@@ -350,19 +350,19 @@ private RMAppImpl createAndPopulateNewRMApp( + submissionContext, user); + } + } +- ++ + ApplicationId applicationId = submissionContext.getApplicationId(); + ResourceRequest amReq = + validateAndCreateResourceRequest(submissionContext, isRecovery); + + // Verify and get the update application priority and set back to + // submissionContext ++ UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); + Priority appPriority = scheduler.checkAndGetApplicationPriority( +- submissionContext.getPriority(), user, submissionContext.getQueue(), ++ submissionContext.getPriority(), userUgi, submissionContext.getQueue(), + applicationId); + submissionContext.setPriority(appPriority); + +- UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); + // Since FairScheduler queue mapping is done inside scheduler, + // if FairScheduler is used and the queue doesn't exist, we should not + // fail here because queue will be created inside FS. Ideally, FS queue +@@ -561,12 +561,14 @@ public void updateApplicationTimeout(RMApp app, + /** + * updateApplicationPriority will invoke scheduler api to update the + * new priority to RM and StateStore. ++ * @param callerUGI user + * @param applicationId Application Id + * @param newAppPriority proposed new application priority + * @throws YarnException Handle exceptions + */ +- public void updateApplicationPriority(ApplicationId applicationId, +- Priority newAppPriority) throws YarnException { ++ public void updateApplicationPriority(UserGroupInformation callerUGI, ++ ApplicationId applicationId, Priority newAppPriority) ++ throws YarnException { + RMApp app = this.rmContext.getRMApps().get(applicationId); + + synchronized (applicationId) { +@@ -579,8 +581,8 @@ public void updateApplicationPriority(ApplicationId applicationId, + + // Invoke scheduler api to update priority in scheduler and to + // State Store. +- Priority appPriority = rmContext.getScheduler() +- .updateApplicationPriority(newAppPriority, applicationId, future); ++ Priority appPriority = rmContext.getScheduler().updateApplicationPriority( ++ newAppPriority, applicationId, future, callerUGI); + + if (app.getApplicationPriority().equals(appPriority)) { + return; +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +index c0cc6b0..17f4cd0 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +@@ -35,6 +35,7 @@ + import org.apache.hadoop.classification.InterfaceAudience.Private; + import org.apache.hadoop.classification.InterfaceStability.Unstable; + import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.service.AbstractService; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; + import org.apache.hadoop.yarn.api.records.ApplicationId; +@@ -779,8 +780,8 @@ protected void refreshMaximumAllocation(Resource newMaxAlloc) { + } + + @Override +- public Priority checkAndGetApplicationPriority(Priority priorityFromContext, +- String user, String queueName, ApplicationId applicationId) ++ public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, ++ UserGroupInformation user, String queueName, ApplicationId applicationId) + throws YarnException { + // Dummy Implementation till Application Priority changes are done in + // specific scheduler. +@@ -789,8 +790,8 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, + + @Override + public Priority updateApplicationPriority(Priority newPriority, +- ApplicationId applicationId, SettableFuture future) +- throws YarnException { ++ ApplicationId applicationId, SettableFuture future, ++ UserGroupInformation user) throws YarnException { + // Dummy Implementation till Application Priority changes are done in + // specific scheduler. + return Priority.newInstance(0); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +index ea1ae60..608f275 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +@@ -311,7 +311,7 @@ void setEntitlement(String queue, QueueEntitlement entitlement) + * Verify whether a submitted application priority is valid as per configured + * Queue + * +- * @param priorityFromContext ++ * @param priorityRequestedByApp + * Submitted Application priority. + * @param user + * User who submitted the Application +@@ -321,8 +321,8 @@ void setEntitlement(String queue, QueueEntitlement entitlement) + * Application ID + * @return Updated Priority from scheduler + */ +- public Priority checkAndGetApplicationPriority(Priority priorityFromContext, +- String user, String queueName, ApplicationId applicationId) ++ public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, ++ UserGroupInformation user, String queueName, ApplicationId applicationId) + throws YarnException; + + /** +@@ -334,12 +334,13 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, + * @param applicationId Application ID + * + * @param future Sets any type of exception happened from StateStore ++ * @param user who submitted the application + * + * @return updated priority + */ + public Priority updateApplicationPriority(Priority newPriority, +- ApplicationId applicationId, SettableFuture future) +- throws YarnException; ++ ApplicationId applicationId, SettableFuture future, ++ UserGroupInformation user) throws YarnException; + + /** + * +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +index 9a73a65..d39ca7c 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +@@ -134,6 +134,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; ++import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; + import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; + import org.apache.hadoop.yarn.server.utils.Lock; + import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +@@ -224,6 +225,7 @@ public Configuration getConf() { + private List asyncSchedulerThreads; + private ResourceCommitterService resourceCommitterService; + private RMNodeLabelsManager labelManager; ++ private AppPriorityACLsManager appPriorityACLManager; + + /** + * EXPERT +@@ -305,8 +307,9 @@ void initScheduler(Configuration configuration) throws + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = new ConcurrentHashMap<>(); + this.labelManager = rmContext.getNodeLabelManager(); ++ this.appPriorityACLManager = new AppPriorityACLsManager(conf); + this.queueManager = new CapacitySchedulerQueueManager(yarnConf, +- this.labelManager); ++ this.labelManager, this.appPriorityACLManager); + this.queueManager.setCapacitySchedulerContext(this); + + this.activitiesManager = new ActivitiesManager(rmContext); +@@ -2180,45 +2183,57 @@ private String handleMoveToPlanQueue(String targetQueueName) { + } + + @Override +- public Priority checkAndGetApplicationPriority(Priority priorityFromContext, +- String user, String queueName, ApplicationId applicationId) +- throws YarnException { +- Priority appPriority = null; +- +- // ToDo: Verify against priority ACLs ++ public Priority checkAndGetApplicationPriority( ++ Priority priorityRequestedByApp, UserGroupInformation user, ++ String queueName, ApplicationId applicationId) throws YarnException { ++ Priority appPriority = priorityRequestedByApp; + + // Verify the scenario where priority is null from submissionContext. +- if (null == priorityFromContext) { ++ if (null == appPriority) { ++ // Verify whether submitted user has any default priority set. If so, ++ // user's default priority will get precedence over queue default. ++ // for updateApplicationPriority call flow, this check is done in ++ // CientRMService itself. ++ appPriority = this.appPriorityACLManager.getDefaultPriority(queueName, ++ user); ++ + // Get the default priority for the Queue. If Queue is non-existent, then +- // use default priority +- priorityFromContext = this.queueManager.getDefaultPriorityForQueue( +- queueName); ++ // use default priority. Do it only if user doesnt have any default. ++ if (null == appPriority) { ++ appPriority = this.queueManager.getDefaultPriorityForQueue(queueName); ++ } + +- LOG.info("Application '" + applicationId +- + "' is submitted without priority " +- + "hence considering default queue/cluster priority: " +- + priorityFromContext.getPriority()); ++ LOG.info( ++ "Application '" + applicationId + "' is submitted without priority " ++ + "hence considering default queue/cluster priority: " ++ + appPriority.getPriority()); + } + + // Verify whether submitted priority is lesser than max priority + // in the cluster. If it is out of found, defining a max cap. +- if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { +- priorityFromContext = Priority ++ if (appPriority.compareTo(getMaxClusterLevelAppPriority()) < 0) { ++ appPriority = Priority + .newInstance(getMaxClusterLevelAppPriority().getPriority()); + } + +- appPriority = priorityFromContext; ++ // Lets check for ACLs here. ++ if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) { ++ throw new YarnException(new AccessControlException( ++ "User " + user + " does not have permission to submit/update " ++ + applicationId + " for " + appPriority)); ++ } + +- LOG.info("Priority '" + appPriority.getPriority() +- + "' is acceptable in queue : " + queueName + " for application: " +- + applicationId + " for the user: " + user); ++ LOG.info( ++ "Priority '" + appPriority.getPriority() + "' is acceptable in queue : " ++ + queueName + " for application: " + applicationId); + + return appPriority; + } + + @Override + public Priority updateApplicationPriority(Priority newPriority, +- ApplicationId applicationId, SettableFuture future) ++ ApplicationId applicationId, SettableFuture future, ++ UserGroupInformation user) + throws YarnException { + Priority appPriority = null; + SchedulerApplication application = applications +@@ -2231,7 +2246,7 @@ public Priority updateApplicationPriority(Priority newPriority, + + RMApp rmApp = rmContext.getRMApps().get(applicationId); + +- appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), ++ appPriority = checkAndGetApplicationPriority(newPriority, user, + rmApp.getQueue(), applicationId); + + if (application.getPriority().equals(appPriority)) { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +index bfaeba4..4ed2155 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +@@ -38,6 +38,7 @@ + import org.apache.hadoop.security.authorize.AccessControlList; + import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hadoop.util.StringUtils; ++import org.apache.hadoop.yarn.api.records.Priority; + import org.apache.hadoop.yarn.api.records.QueueACL; + import org.apache.hadoop.yarn.api.records.QueueState; + import org.apache.hadoop.yarn.api.records.ReservationACL; +@@ -49,6 +50,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; + import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PriorityACLConfiguration.PriorityACLConfig; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +@@ -63,7 +65,7 @@ + + private static final Log LOG = + LogFactory.getLog(CapacitySchedulerConfiguration.class); +- ++ + private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml"; + + @Private +@@ -274,6 +276,8 @@ + @Private + public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + ++ PriorityACLConfiguration priorityACLConfig = new PriorityACLConfiguration(); ++ + public CapacitySchedulerConfiguration() { + this(new Configuration()); + } +@@ -602,6 +606,10 @@ private static String getAclKey(ReservationACL acl) { + return "acl_" + StringUtils.toLowerCase(acl.toString()); + } + ++ private static String getAclKey(AccessType acl) { ++ return "acl_" + StringUtils.toLowerCase(acl.toString()); ++ } ++ + @Override + public Map getReservationAcls(String + queue) { +@@ -627,6 +635,11 @@ private void setAcl(String queue, ReservationACL acl, String aclString) { + set(queuePrefix + getAclKey(acl), aclString); + } + ++ private void setAcl(String queue, AccessType acl, String aclString) { ++ String queuePrefix = getQueuePrefix(queue); ++ set(queuePrefix + getAclKey(acl), aclString); ++ } ++ + public Map getAcls(String queue) { + Map acls = + new HashMap(); +@@ -650,6 +663,34 @@ public void setReservationAcls(String queue, + } + } + ++ @VisibleForTesting ++ public void setPriorityAcls(String queue, Priority priority, ++ Priority defaultPriority, String[] acls) { ++ StringBuilder aclString = new StringBuilder(); ++ ++ StringBuilder userAndGroup = new StringBuilder(); ++ for (int i = 0; i < acls.length; i++) { ++ userAndGroup.append(PriorityACLConfig.values()[i] + "=" + acls[i].trim()) ++ .append(" "); ++ } ++ ++ aclString.append("[" + userAndGroup.toString().trim() + " " ++ + "max_priority=" + priority.getPriority() + " " + "default_priority=" ++ + defaultPriority.getPriority() + "]"); ++ ++ setAcl(queue, AccessType.ACCESS_PRIORITY, aclString.toString()); ++ } ++ ++ public List getPriorityAcls(String queue, ++ Priority clusterMaxPriority) { ++ String queuePrefix = getQueuePrefix(queue); ++ String defaultAcl = ALL_ACL; ++ String aclString = get( ++ queuePrefix + getAclKey(AccessType.ACCESS_PRIORITY), defaultAcl); ++ ++ return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString); ++ } ++ + public String[] getQueues(String queue) { + LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue)); + String[] queues = getStrings(getQueuePrefix(queue) + QUEUES); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +index c41a7bf..1a52eda 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +@@ -23,6 +23,7 @@ + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; + import org.apache.hadoop.yarn.api.records.NodeId; ++import org.apache.hadoop.yarn.api.records.Priority; + import org.apache.hadoop.yarn.api.records.Resource; + import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +@@ -83,4 +84,6 @@ + ResourceUsage getClusterResourceUsage(); + + ActivitiesManager getActivitiesManager(); ++ ++ Priority getMaxClusterLevelAppPriority(); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +index 7a6ce56..9d24904 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +@@ -42,6 +42,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; ++import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; + + /** + * +@@ -85,16 +86,20 @@ public CSQueue hook(CSQueue queue) { + private final Map queues = new ConcurrentHashMap<>(); + private CSQueue root; + private final RMNodeLabelsManager labelManager; ++ private AppPriorityACLsManager appPriorityACLManager; + + /** + * Construct the service. + * @param conf the configuration + * @param labelManager the labelManager ++ * @param appPriorityACLManager Priority ACL Manager + */ + public CapacitySchedulerQueueManager(Configuration conf, +- RMNodeLabelsManager labelManager) { ++ RMNodeLabelsManager labelManager, ++ AppPriorityACLsManager appPriorityACLManager) { + this.authorizer = YarnAuthorizationProvider.getInstance(conf); + this.labelManager = labelManager; ++ this.appPriorityACLManager = appPriorityACLManager; + } + + @Override +@@ -140,7 +145,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) + throws IOException { + root = parseQueue(this.csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); +- setQueueAcls(authorizer, queues); ++ setQueueAcls(authorizer, appPriorityACLManager, queues); + labelManager.reinitializeQueueLabels(getQueueToLabels()); + LOG.info("Initialized root queue " + root); + } +@@ -162,7 +167,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) + // Re-configure queues + root.reinitialize(newRoot, this.csContext.getClusterResource()); + +- setQueueAcls(authorizer, queues); ++ setQueueAcls(authorizer, appPriorityACLManager, queues); + + // Re-calculate headroom for active applications + Resource clusterResource = this.csContext.getClusterResource(); +@@ -298,12 +303,20 @@ private void addNewQueues( + * @throws IOException if fails to set queue acls + */ + public static void setQueueAcls(YarnAuthorizationProvider authorizer, +- Map queues) throws IOException { ++ AppPriorityACLsManager appPriorityACLManager, Map queues) ++ throws IOException { + List permissions = new ArrayList<>(); + for (CSQueue queue : queues.values()) { + AbstractCSQueue csQueue = (AbstractCSQueue) queue; + permissions.add( + new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); ++ ++ if (queue instanceof LeafQueue) { ++ LeafQueue lQueue = (LeafQueue) queue; ++ ++ appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(), ++ lQueue.getQueueName()); ++ } + } + authorizer.setPermission(permissions, + UserGroupInformation.getCurrentUser()); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +index 1c6471f..6524237 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +@@ -139,6 +139,9 @@ + private Map> ignorePartitionExclusivityRMContainers = + new ConcurrentHashMap<>(); + ++ List priorityAcls = ++ new ArrayList(); ++ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public LeafQueue(CapacitySchedulerContext cs, + String queueName, CSQueue parent, CSQueue old) throws IOException { +@@ -204,6 +207,9 @@ protected void setupQueueConfigs(Resource clusterResource) + conf.getMaximumApplicationMasterResourcePerQueuePercent( + getQueuePath()); + ++ priorityAcls = conf.getPriorityAcls(getQueuePath(), ++ scheduler.getMaxClusterLevelAppPriority()); ++ + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, + this.defaultLabelExpression, null)) { + throw new IOException( +@@ -496,6 +502,16 @@ private User getUserAndAddIfAbsent(String userName) { + } + } + ++ @Private ++ public List getPriorityACLs() { ++ try { ++ readLock.lock(); ++ return priorityAcls; ++ } finally { ++ readLock.unlock(); ++ } ++ } ++ + @Override + public void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java +new file mode 100644 +index 0000000..3d5f413 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java +@@ -0,0 +1,212 @@ ++/** ++* Licensed to the Apache Software Foundation (ASF) under one ++* or more contributor license agreements. See the NOTICE file ++* distributed with this work for additional information ++* regarding copyright ownership. The ASF licenses this file ++* to you under the Apache License, Version 2.0 (the ++* "License"); you may not use this file except in compliance ++* with the License. You may obtain a copy of the License at ++* ++* http://www.apache.org/licenses/LICENSE-2.0 ++* ++* Unless required by applicable law or agreed to in writing, software ++* distributed under the License is distributed on an "AS IS" BASIS, ++* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++* See the License for the specific language governing permissions and ++* limitations under the License. ++*/ ++ ++package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; ++ ++import java.util.ArrayList; ++import java.util.List; ++import java.util.regex.Matcher; ++import java.util.regex.Pattern; ++ ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.classification.InterfaceAudience.Private; ++import org.apache.hadoop.security.authorize.AccessControlList; ++import org.apache.hadoop.util.StringUtils; ++import org.apache.hadoop.yarn.api.records.Priority; ++ ++public class PriorityACLConfiguration { ++ ++ private static final Log LOG = LogFactory ++ .getLog(PriorityACLConfiguration.class); ++ ++ public enum PriorityACLConfig { ++ USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4); ++ ++ private final int id; ++ ++ PriorityACLConfig(int id) { ++ this.id = id; ++ } ++ ++ public int getId() { ++ return this.id; ++ } ++ } ++ ++ public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)"; ++ ++ @Private ++ public static final String ALL_ACL = "*"; ++ ++ @Private ++ public static final String NONE_ACL = " "; ++ ++ public List getPriorityAcl(Priority clusterMaxPriority, ++ String aclString) { ++ ++ List aclList = new ArrayList(); ++ Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL) ++ .matcher(aclString); ++ ++ /* ++ * Each ACL group will be separated by "[]". Syntax of each ACL group could ++ * be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1" ++ * Ideally this means "for this given user/group, maximum possible priority ++ * is a2 and if the user has not specified any priority, then it is a1." ++ */ ++ while (matcher.find()) { ++ // Get the first ACL sub-group. ++ String aclSubGroup = matcher.group(1); ++ if (aclSubGroup.trim().isEmpty()) { ++ continue; ++ } ++ ++ /* ++ * Internal storage is PriorityACLGroup which stores each parsed priority ++ * ACLs group. This will help while looking for a user to priority mapping ++ * during app submission time. ACLs will be passed in below order only. 1. ++ * user/group 2. max-priority 3. default-priority ++ */ ++ PriorityACLGroup userPriorityACL = new PriorityACLGroup(); ++ ++ // userAndGroupName will hold user acl and group acl as interim storage ++ // since both user/group acl comes with separate key value pairs. ++ List userAndGroupName = new ArrayList<>(); ++ ++ for (String kvPair : aclSubGroup.trim().split(" +")) { ++ /* ++ * There are 3 possible options for key here: 1. user/group 2. ++ * max-priority 3. default-priority ++ */ ++ String[] splits = kvPair.split("="); ++ ++ // Ensure that each ACL sub string is key value pair separated by '='. ++ if (splits != null && splits.length > 1) { ++ parsePriorityACLType(userPriorityACL, splits, userAndGroupName); ++ } ++ } ++ ++ // If max_priority is higher to clusterMaxPriority, its better to ++ // handle here. ++ if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority ++ .getPriority()) { ++ LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority() ++ + "' is greater that cluster max priority. Resetting ACLs to " ++ + clusterMaxPriority); ++ userPriorityACL.setMaxPriority( ++ Priority.newInstance(clusterMaxPriority.getPriority())); ++ } ++ ++ AccessControlList acl = createACLStringForPriority(userAndGroupName); ++ userPriorityACL.setACLList(acl); ++ aclList.add(userPriorityACL); ++ } ++ ++ return aclList; ++ } ++ ++ /* ++ * Parse different types of ACLs sub parts for on priority group and store in ++ * a map for later processing. ++ */ ++ private void parsePriorityACLType(PriorityACLGroup userPriorityACL, ++ String[] splits, List userAndGroupName) { ++ // Here splits will have the key value pair at index 0 and 1 respectively. ++ // To parse all keys, its better to convert to PriorityACLConfig enum. ++ PriorityACLConfig aclType = PriorityACLConfig ++ .valueOf(StringUtils.toUpperCase(splits[0].trim())); ++ switch (aclType) { ++ case MAX_PRIORITY : ++ userPriorityACL ++ .setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1]))); ++ break; ++ case USER : ++ userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); ++ break; ++ case GROUP : ++ userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); ++ break; ++ case DEFAULT_PRIORITY : ++ int defaultPriority = Integer.parseInt(splits[1]); ++ Priority priority = (defaultPriority < 0) ++ ? Priority.newInstance(0) ++ : Priority.newInstance(defaultPriority); ++ userPriorityACL.setDefaultPriority(priority); ++ break; ++ } ++ } ++ ++ /* ++ * This method will help to append different types of ACLs keys against one ++ * priority. For eg,USER will be appended with GROUP as "user2,user4 group1". ++ */ ++ private AccessControlList createACLStringForPriority( ++ List acls) { ++ ++ String finalACL = ""; ++ String userACL = acls.get(0).toString(); ++ ++ // If any of user/group is *, consider it as acceptable for all. ++ // "user" is at index 0, and "group" is at index 1. ++ if (userACL.trim().equals(ALL_ACL)) { ++ finalACL = ALL_ACL; ++ } else if (userACL.equals(NONE_ACL)) { ++ finalACL = NONE_ACL; ++ } else { ++ ++ // Get USER segment ++ if (!userACL.trim().isEmpty()) { ++ // skip last appended "," ++ finalACL = acls.get(0).toString(); ++ } ++ ++ // Get GROUP segment if any ++ if (acls.size() > 1) { ++ String groupACL = acls.get(1).toString(); ++ if (!groupACL.trim().isEmpty()) { ++ finalACL = finalACL + " " ++ + acls.get(1).toString(); ++ } ++ } ++ } ++ ++ // Here ACL will look like "user1,user2 group" in ideal cases. ++ return new AccessControlList(finalACL.trim()); ++ } ++ ++ /* ++ * This method will help to append user/group acl string against given ++ * priority. For example "user1,user2 group1,group2" ++ */ ++ private StringBuilder getUserOrGroupACLStringFromConfig(String value) { ++ ++ // ACL strings could be generate for USER or GRUOP. ++ // aclList in map contains two entries. 1. USER, 2. GROUP. ++ StringBuilder aclTypeName = new StringBuilder(); ++ ++ if (value.trim().equals(ALL_ACL)) { ++ aclTypeName.setLength(0); ++ aclTypeName.append(ALL_ACL); ++ return aclTypeName; ++ } ++ ++ aclTypeName.append(value.trim()); ++ return aclTypeName; ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java +new file mode 100644 +index 0000000..877f9d5 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java +@@ -0,0 +1,89 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; ++ ++import org.apache.hadoop.security.authorize.AccessControlList; ++import org.apache.hadoop.yarn.api.records.Priority; ++ ++/** ++ * PriorityACLGroup will hold all ACL related information per priority. ++ * ++ */ ++public class PriorityACLGroup implements Comparable { ++ ++ private Priority maxPriority = null; ++ private Priority defaultPriority = null; ++ private AccessControlList aclList = null; ++ ++ public PriorityACLGroup(Priority maxPriority, Priority defaultPriority, ++ AccessControlList aclList) { ++ this.setMaxPriority(Priority.newInstance(maxPriority.getPriority())); ++ this.setDefaultPriority( ++ Priority.newInstance(defaultPriority.getPriority())); ++ this.setACLList(aclList); ++ } ++ ++ public PriorityACLGroup() { ++ } ++ ++ @Override ++ public int compareTo(PriorityACLGroup o) { ++ return getMaxPriority().compareTo(o.getMaxPriority()); ++ } ++ ++ @Override ++ public boolean equals(Object obj) { ++ if (this == obj) ++ return true; ++ if (obj == null) ++ return false; ++ if (getClass() != obj.getClass()) ++ return false; ++ PriorityACLGroup other = (PriorityACLGroup) obj; ++ if (getMaxPriority() != other.getMaxPriority()) ++ return false; ++ if (getDefaultPriority() != other.getDefaultPriority()) ++ return false; ++ return true; ++ } ++ ++ public Priority getMaxPriority() { ++ return maxPriority; ++ } ++ ++ public Priority getDefaultPriority() { ++ return defaultPriority; ++ } ++ ++ public AccessControlList getACLList() { ++ return aclList; ++ } ++ ++ public void setMaxPriority(Priority maxPriority) { ++ this.maxPriority = maxPriority; ++ } ++ ++ public void setDefaultPriority(Priority defaultPriority) { ++ this.defaultPriority = defaultPriority; ++ } ++ ++ public void setACLList(AccessControlList accessControlList) { ++ this.aclList = accessControlList; ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java +new file mode 100644 +index 0000000..ffe98bc +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java +@@ -0,0 +1,216 @@ ++/** ++* Licensed to the Apache Software Foundation (ASF) under one ++* or more contributor license agreements. See the NOTICE file ++* distributed with this work for additional information ++* regarding copyright ownership. The ASF licenses this file ++* to you under the Apache License, Version 2.0 (the ++* "License"); you may not use this file except in compliance ++* with the License. You may obtain a copy of the License at ++* ++* http://www.apache.org/licenses/LICENSE-2.0 ++* ++* Unless required by applicable law or agreed to in writing, software ++* distributed under the License is distributed on an "AS IS" BASIS, ++* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++* See the License for the specific language governing permissions and ++* limitations under the License. ++*/ ++ ++package org.apache.hadoop.yarn.server.resourcemanager.security; ++ ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.security.UserGroupInformation; ++import org.apache.hadoop.security.authorize.AccessControlList; ++import org.apache.hadoop.yarn.api.records.Priority; ++import org.apache.hadoop.yarn.conf.YarnConfiguration; ++import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PriorityACLGroup; ++ ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.List; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ConcurrentMap; ++ ++public class AppPriorityACLsManager { ++ ++ private static final Log LOG = LogFactory ++ .getLog(AppPriorityACLsManager.class); ++ ++ /* ++ * An internal class to store ACLs specific to each priority. This will be ++ * used to read and process acl's during app submission time as well. ++ */ ++ private static class PriorityACL { ++ private Priority priority; ++ private Priority defaultPriority; ++ private AccessControlList acl; ++ ++ PriorityACL(Priority priority, Priority defaultPriority, ++ AccessControlList acl) { ++ this.setPriority(priority); ++ this.setDefaultPriority(defaultPriority); ++ this.setAcl(acl); ++ } ++ ++ public Priority getPriority() { ++ return priority; ++ } ++ ++ public void setPriority(Priority priority) { ++ this.priority = priority; ++ } ++ ++ public Priority getDefaultPriority() { ++ return defaultPriority; ++ } ++ ++ public void setDefaultPriority(Priority priority) { ++ this.defaultPriority = priority; ++ } ++ ++ public AccessControlList getAcl() { ++ return acl; ++ } ++ ++ public void setAcl(AccessControlList acl) { ++ this.acl = acl; ++ } ++ } ++ ++ private boolean isACLsEnable; ++ private final ConcurrentMap> allAcls = ++ new ConcurrentHashMap<>(); ++ ++ public AppPriorityACLsManager(Configuration conf) { ++ this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, ++ YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); ++ } ++ ++ /** ++ * Each Queue could have configured with different priority acl's groups. This ++ * method helps to store each such ACL list against queue. ++ * ++ * @param priorityACLGroups ++ * List of Priority ACL Groups. ++ * @param queueName ++ * Queue Name associate with priority acl groups. ++ */ ++ public void addPrioirityACLs(List priorityACLGroups, ++ String queueName) { ++ ++ List priorityACL = allAcls.get(queueName); ++ if (null == priorityACL) { ++ priorityACL = new ArrayList(); ++ allAcls.put(queueName, priorityACL); ++ } ++ ++ // Ensure lowest priority PriorityACLGroup comes first in the list. ++ Collections.sort(priorityACLGroups); ++ ++ for (PriorityACLGroup priorityACLGroup : priorityACLGroups) { ++ priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(), ++ priorityACLGroup.getDefaultPriority(), ++ priorityACLGroup.getACLList())); ++ //if (LOG.isDebugEnabled()) { ++ LOG.info("Priority ACL group added: max-priority - " ++ + priorityACLGroup.getMaxPriority() + "default-priority - " ++ + priorityACLGroup.getDefaultPriority()); ++ // } ++ } ++ } ++ ++ /** ++ * Priority based checkAccess to ensure that given user has enough permission ++ * to submit application at a given priority level. ++ * ++ * @param callerUGI ++ * User who submits the application. ++ * @param queueName ++ * Queue to which application is submitted. ++ * @param submittedPriority ++ * priority of the application. ++ * @return True or False to indicate whether application can be submitted at ++ * submitted priority level or not. ++ */ ++ public boolean checkAccess(UserGroupInformation callerUGI, String queueName, ++ Priority submittedPriority) { ++ if (!isACLsEnable) { ++ return true; ++ } ++ ++ List acls = allAcls.get(queueName); ++ if (acls == null || acls.isEmpty()) { ++ return true; ++ } ++ ++ PriorityACL approvedPriorityACL = getPriorityPerUserACL(acls, ++ callerUGI, submittedPriority); ++ if (approvedPriorityACL == null) { ++ return false; ++ } ++ ++ return true; ++ } ++ ++ /** ++ * If an application is submitted without any priority, and submitted user has ++ * a default priority, this method helps to update this default priority as ++ * app's priority. ++ * ++ * @param queueName ++ * Submitted queue ++ * @param user ++ * User who submitted this application ++ * @return Default priority associated with given user. ++ */ ++ public Priority getDefaultPriority(String queueName, ++ UserGroupInformation user) { ++ if (!isACLsEnable) { ++ return null; ++ } ++ ++ List acls = allAcls.get(queueName); ++ if (acls == null || acls.isEmpty()) { ++ return null; ++ } ++ ++ PriorityACL approvedPriorityACL = getPriorityPerUserACL(acls, user, ++ null); ++ if (approvedPriorityACL == null) { ++ return null; ++ } ++ ++ Priority defaultPriority = Priority ++ .newInstance(approvedPriorityACL.getDefaultPriority().getPriority()); ++ return defaultPriority; ++ } ++ ++ private PriorityACL getPriorityPerUserACL(List acls , ++ UserGroupInformation user, Priority submittedPriority) { ++ ++ // Iterate through all configured ACLs starting from lower priority. ++ // If user is found corresponding to a configured priority, then store ++ // that entry. if failed, continue iterate through whole acl list. ++ PriorityACL selectedAcl = null; ++ for (PriorityACL entry : acls) { ++ AccessControlList list = entry.getAcl(); ++ ++ if (list.isUserAllowed(user)) { ++ selectedAcl = entry; ++ ++ // If submittedPriority is passed through the argument, also check ++ // whether submittedPriority is under max-priority of each ACL group. ++ if (submittedPriority != null) { ++ selectedAcl = null; ++ if (submittedPriority.getPriority() <= entry.getPriority() ++ .getPriority()) { ++ return entry; ++ } ++ } ++ } ++ } ++ return selectedAcl; ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +index e661703..fbd5ac3 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +@@ -43,6 +43,8 @@ + protected static final String COMMON_USER = "common_user"; + protected static final String QUEUE_A_USER = "queueA_user"; + protected static final String QUEUE_B_USER = "queueB_user"; ++ protected static final String QUEUE_A_GROUP = "queueA_group"; ++ protected static final String QUEUE_B_GROUP = "queueB_group"; + protected static final String ROOT_ADMIN = "root_admin"; + protected static final String QUEUE_A_ADMIN = "queueA_admin"; + protected static final String QUEUE_B_ADMIN = "queueB_admin"; +@@ -53,7 +55,7 @@ + + protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class); + +- MockRM resourceManager; ++ protected MockRM resourceManager; + Configuration conf; + YarnRPC rpc; + InetSocketAddress rmAddress; +@@ -68,6 +70,7 @@ public void setup() throws InterruptedException, IOException { + + AccessControlList adminACL = new AccessControlList(""); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString()); ++ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + + resourceManager = new MockRM(conf) { + protected ClientRMService createClientRMService() { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +index 00466ae..23bed22 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +@@ -29,6 +29,7 @@ + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; + import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; + import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +@@ -463,7 +464,9 @@ public void testPriorityInAllocatedResponse() throws Exception { + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); +- rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(), ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app1.getUser()); ++ rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(), + appPriority2); + + AllocateResponse response2 = am1.allocate(allocateRequest); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +index cb57f39..79e481f 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +@@ -1120,7 +1120,7 @@ private static YarnScheduler mockYarnScheduler() throws YarnException { + when(yarnScheduler.getResourceCalculator()).thenReturn(rs); + + when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class), +- anyString(), anyString(), any(ApplicationId.class))) ++ any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) + .thenReturn(Priority.newInstance(0)); + return yarnScheduler; + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +index 164ca20..ff52efd 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +@@ -29,6 +29,7 @@ + import java.util.Map; + + import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; + import org.apache.hadoop.yarn.api.records.ApplicationId; + import org.apache.hadoop.yarn.api.records.Container; +@@ -344,7 +345,10 @@ public void testUpdatePriorityAtRuntime() throws Exception { + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); +- cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app1.getUser()); ++ cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, ++ ugi); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() +@@ -378,7 +382,10 @@ public void testUpdateInvalidPriorityAtRuntime() throws Exception { + + // Change the priority of App1 to 15 + Priority appPriority2 = Priority.newInstance(15); +- cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app1.getUser()); ++ cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, ++ ugi); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() +@@ -428,7 +435,10 @@ public void testRMRestartWithChangeInPriority() throws Exception { + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); +- cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app1.getUser()); ++ cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, ++ ugi); + + // let things settle down + Thread.sleep(1000); +@@ -557,7 +567,10 @@ public void testApplicationPriorityAllocationWithChangeInPriority() + + // Change the priority of App1 to 3 (lowest) + Priority appPriority3 = Priority.newInstance(3); +- cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null); ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app2.getUser()); ++ cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null, ++ ugi); + + // add request for containers App2 + am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList()); +@@ -788,8 +801,10 @@ private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, + int appsPendingExpected, int activeAppsExpected, RMApp app) + throws YarnException { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); ++ UserGroupInformation ugi = UserGroupInformation ++ .createRemoteUser(app.getUser()); + cs.updateApplicationPriority(Priority.newInstance(2), +- app.getApplicationId(), null); ++ app.getApplicationId(), null, ugi); + SchedulerEvent removeAttempt; + removeAttempt = new AppAttemptRemovedSchedulerEvent( + app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java +new file mode 100644 +index 0000000..5eadeb1 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java +@@ -0,0 +1,120 @@ ++/** ++* Licensed to the Apache Software Foundation (ASF) under one ++* or more contributor license agreements. See the NOTICE file ++* distributed with this work for additional information ++* regarding copyright ownership. The ASF licenses this file ++* to you under the Apache License, Version 2.0 (the ++* "License"); you may not use this file except in compliance ++* with the License. You may obtain a copy of the License at ++* ++* http://www.apache.org/licenses/LICENSE-2.0 ++* ++* Unless required by applicable law or agreed to in writing, software ++* distributed under the License is distributed on an "AS IS" BASIS, ++* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++* See the License for the specific language governing permissions and ++* limitations under the License. ++*/ ++package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; ++ ++import java.util.List; ++ ++import org.apache.hadoop.yarn.api.records.Priority; ++import org.junit.Assert; ++import org.junit.Test; ++ ++ ++public class TestApplicationPriorityACLConfiguration { ++ ++ private final int defaultPriorityQueueA = 3; ++ private final int defaultPriorityQueueB = -1; ++ private final int maxPriorityQueueA = 5; ++ private final int maxPriorityQueueB = 10; ++ private final int clusterMaxPriority = 10; ++ ++ private static final String QUEUE_A_USER = "queueA_user"; ++ private static final String QUEUE_B_USER = "queueB_user"; ++ private static final String QUEUE_A_GROUP = "queueA_group"; ++ ++ private static final String QUEUEA = "queueA"; ++ private static final String QUEUEB = "queueB"; ++ private static final String QUEUEC = "queueC"; ++ ++ @Test ++ public void testSimpleACLConfiguration() throws Exception { ++ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); ++ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, ++ new String[]{QUEUEA, QUEUEB, QUEUEC}); ++ ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); ++ ++ // Success case: Configure one user/group level priority acl for queue A. ++ String[] aclsForA = new String[2]; ++ aclsForA[0] = QUEUE_A_USER; ++ aclsForA[1] = QUEUE_A_GROUP; ++ csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, ++ Priority.newInstance(maxPriorityQueueA), ++ Priority.newInstance(defaultPriorityQueueA), aclsForA); ++ ++ // Try to get the ACL configs and make sure there are errors/exceptions ++ List pGroupA = csConf.getPriorityAcls( ++ CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, ++ Priority.newInstance(clusterMaxPriority)); ++ ++ // Validate! ++ verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, ++ defaultPriorityQueueA); ++ } ++ ++ @Test ++ public void testACLConfigurationForInvalidCases() throws Exception { ++ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); ++ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, ++ new String[]{QUEUEA, QUEUEB, QUEUEC}); ++ ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); ++ ++ // Success case: Configure one user/group level priority acl for queue A. ++ String[] aclsForA = new String[2]; ++ aclsForA[0] = QUEUE_A_USER; ++ aclsForA[1] = QUEUE_A_GROUP; ++ csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, ++ Priority.newInstance(maxPriorityQueueA), ++ Priority.newInstance(defaultPriorityQueueA), aclsForA); ++ ++ String[] aclsForB = new String[1]; ++ aclsForB[0] = QUEUE_B_USER; ++ csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, ++ Priority.newInstance(maxPriorityQueueB), ++ Priority.newInstance(defaultPriorityQueueB), aclsForB); ++ ++ // Try to get the ACL configs and make sure there are errors/exceptions ++ List pGroupA = csConf.getPriorityAcls( ++ CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, ++ Priority.newInstance(clusterMaxPriority)); ++ List pGroupB = csConf.getPriorityAcls( ++ CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, ++ Priority.newInstance(clusterMaxPriority)); ++ ++ // Validate stored ACL values with configured ones. ++ verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, ++ defaultPriorityQueueA); ++ verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0); ++ } ++ ++ private void verifyACLs(List pGroup, String queueUser, ++ String queueGroup, int maxPriority, int defaultPriority) { ++ PriorityACLGroup group = pGroup.get(0); ++ String aclString = queueUser + " " + queueGroup; ++ ++ Assert.assertEquals(aclString.trim(), ++ group.getACLList().getAclString().trim()); ++ Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority()); ++ Assert.assertEquals(defaultPriority, ++ group.getDefaultPriority().getPriority()); ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java +new file mode 100644 +index 0000000..b41ba83 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java +@@ -0,0 +1,206 @@ ++/** ++* Licensed to the Apache Software Foundation (ASF) under one ++* or more contributor license agreements. See the NOTICE file ++* distributed with this work for additional information ++* regarding copyright ownership. The ASF licenses this file ++* to you under the Apache License, Version 2.0 (the ++* "License"); you may not use this file except in compliance ++* with the License. You may obtain a copy of the License at ++* ++* http://www.apache.org/licenses/LICENSE-2.0 ++* ++* Unless required by applicable law or agreed to in writing, software ++* distributed under the License is distributed on an "AS IS" BASIS, ++* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++* See the License for the specific language governing permissions and ++* limitations under the License. ++*/ ++package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; ++ ++ ++import java.io.IOException; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.ipc.RemoteException; ++import org.apache.hadoop.yarn.api.ApplicationClientProtocol; ++import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; ++import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; ++import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; ++import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; ++import org.apache.hadoop.yarn.api.records.ApplicationId; ++import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; ++import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; ++import org.apache.hadoop.yarn.api.records.Priority; ++import org.apache.hadoop.yarn.api.records.Resource; ++import org.apache.hadoop.yarn.conf.YarnConfiguration; ++import org.apache.hadoop.yarn.exceptions.YarnException; ++import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase; ++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; ++import org.apache.hadoop.yarn.server.utils.BuilderUtils; ++import org.junit.Assert; ++import org.junit.Test; ++ ++ ++public class TestApplicationPriorityACLs extends ACLsTestBase { ++ ++ private final int defaultPriorityQueueA = 3; ++ private final int defaultPriorityQueueB = 10; ++ private final int maxPriorityQueueA = 5; ++ private final int maxPriorityQueueB = 11; ++ private final int clusterMaxPriority = 10; ++ ++ @Test ++ public void testApplicationACLs() throws Exception { ++ ++ /* ++ * Cluster Max-priority is 10. User 'queueA_user' has permission to submit ++ * apps only at priority 5. Default priority for this user is 3. ++ */ ++ ++ // Case 1: App will be submitted with priority 5. ++ verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5); ++ ++ // Case 2: App will be rejected as submitted priority was 6. ++ verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6); ++ ++ // Case 3: App will be submitted w/o priority, hence consider default 3. ++ verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1); ++ ++ // Case 4: App will be submitted with priority 11. ++ verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11); ++ } ++ ++ private void verifyAppSubmitWithPrioritySuccess(String submitter, ++ String queueName, int priority) throws Exception { ++ Priority appPriority = null; ++ if (priority > 0) { ++ appPriority = Priority.newInstance(priority); ++ } else { ++ // RM will consider default priority for the submitted user. So update ++ // priority to the default value to compare. ++ priority = defaultPriorityQueueA; ++ } ++ ++ ApplicationSubmissionContext submissionContext = prepareForAppSubmission( ++ submitter, queueName, appPriority); ++ submitAppToRMWithValidAcl(submitter, submissionContext); ++ ++ // Ideally get app report here and check the priority. ++ verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(), ++ priority); ++ } ++ ++ private void verifyAppSubmitWithPriorityFailure(String submitter, ++ String queueName, int priority) throws Exception { ++ Priority appPriority = Priority.newInstance(priority); ++ ApplicationSubmissionContext submissionContext = prepareForAppSubmission( ++ submitter, queueName, appPriority); ++ submitAppToRMWithInValidAcl(submitter, submissionContext); ++ } ++ ++ private ApplicationSubmissionContext prepareForAppSubmission(String submitter, ++ String queueName, Priority priority) throws Exception { ++ ++ GetNewApplicationRequest newAppRequest = GetNewApplicationRequest ++ .newInstance(); ++ ++ ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); ++ ApplicationId applicationId = submitterClient ++ .getNewApplication(newAppRequest).getApplicationId(); ++ ++ Resource resource = BuilderUtils.newResource(1024, 1); ++ ++ ContainerLaunchContext amContainerSpec = ContainerLaunchContext ++ .newInstance(null, null, null, null, null, null); ++ ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext ++ .newInstance(applicationId, "applicationName", queueName, null, ++ amContainerSpec, false, true, 1, resource, "applicationType"); ++ appSubmissionContext.setApplicationId(applicationId); ++ appSubmissionContext.setQueue(queueName); ++ if (null != priority) { ++ appSubmissionContext.setPriority(priority); ++ } ++ ++ return appSubmissionContext; ++ } ++ ++ private void submitAppToRMWithValidAcl(String submitter, ++ ApplicationSubmissionContext appSubmissionContext) ++ throws YarnException, IOException, InterruptedException { ++ ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); ++ SubmitApplicationRequest submitRequest = SubmitApplicationRequest ++ .newInstance(appSubmissionContext); ++ submitterClient.submitApplication(submitRequest); ++ resourceManager.waitForState(appSubmissionContext.getApplicationId(), ++ RMAppState.ACCEPTED); ++ } ++ ++ private void submitAppToRMWithInValidAcl(String submitter, ++ ApplicationSubmissionContext appSubmissionContext) ++ throws YarnException, IOException, InterruptedException { ++ ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); ++ SubmitApplicationRequest submitRequest = SubmitApplicationRequest ++ .newInstance(appSubmissionContext); ++ try { ++ submitterClient.submitApplication(submitRequest); ++ } catch (YarnException ex) { ++ Assert.assertTrue(ex.getCause() instanceof RemoteException); ++ } ++ } ++ ++ private void verifyAppPriorityIsAccepted(String submitter, ++ ApplicationId applicationId, int priority) ++ throws IOException, InterruptedException { ++ ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); ++ ++ /** ++ * If priority is greater than cluster max, RM will auto set to cluster max ++ * Consider this scenario as a special case. ++ */ ++ if (priority > clusterMaxPriority) { ++ priority = clusterMaxPriority; ++ } ++ ++ GetApplicationReportRequest request = GetApplicationReportRequest ++ .newInstance(applicationId); ++ try { ++ GetApplicationReportResponse response = submitterClient ++ .getApplicationReport(request); ++ Assert.assertEquals(response.getApplicationReport().getPriority(), ++ Priority.newInstance(priority)); ++ } catch (YarnException e) { ++ Assert.fail("Application submission should not fail."); ++ } ++ } ++ ++ @Override ++ protected Configuration createConfiguration() { ++ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); ++ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, ++ new String[]{QUEUEA, QUEUEB, QUEUEC}); ++ ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); ++ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); ++ ++ String[] aclsForA = new String[2]; ++ aclsForA[0] = QUEUE_A_USER; ++ aclsForA[1] = QUEUE_A_GROUP; ++ csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, ++ Priority.newInstance(maxPriorityQueueA), ++ Priority.newInstance(defaultPriorityQueueA), aclsForA); ++ ++ String[] aclsForB = new String[2]; ++ aclsForB[0] = QUEUE_B_USER; ++ aclsForB[1] = QUEUE_B_GROUP; ++ csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, ++ Priority.newInstance(maxPriorityQueueB), ++ Priority.newInstance(defaultPriorityQueueB), aclsForB); ++ ++ csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); ++ csConf.set(YarnConfiguration.RM_SCHEDULER, ++ CapacityScheduler.class.getName()); ++ ++ return csConf; ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +index a36db44..1348f51 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +@@ -55,6 +55,7 @@ + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; ++import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; + import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; + import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + import org.apache.hadoop.yarn.util.resource.Resources; +@@ -856,7 +857,10 @@ public void testQueueAcl() throws Exception { + TestUtils.spyHook); + YarnAuthorizationProvider authorizer = + YarnAuthorizationProvider.getInstance(conf); +- CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues); ++ AppPriorityACLsManager appPriorityACLManager = new AppPriorityACLsManager( ++ conf); ++ CapacitySchedulerQueueManager.setQueueAcls(authorizer, ++ appPriorityACLManager, queues); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + // Setup queue configs +-- +2.7.4 (Apple Git-66) + diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 79f934c..e3a757c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -964,7 +964,7 @@ protected void completedContainerInternal(RMContainer rmContainer, @Override public Priority checkAndGetApplicationPriority(Priority priority, - String user, String queueName, ApplicationId applicationId) + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException { // TODO Dummy implementation. return Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java index 32459b9..f60caf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AccessType.java @@ -30,4 +30,6 @@ // queue SUBMIT_APP, ADMINISTER_QUEUE, + // application + ACCESS_PRIORITY, } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 6ac726e..75f4c0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -98,6 +98,15 @@ + yarn.scheduler.capacity.root.default.acl_access_priority + * + + The ACL of who can submit applications with configured priority. + For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] + + + + yarn.scheduler.capacity.node-locality-delay 40 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 3dc7e38..0f56a2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1614,7 +1614,8 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } try { - rmAppManager.updateApplicationPriority(applicationId, newAppPriority); + rmAppManager.updateApplicationPriority(callerUGI, applicationId, + newAppPriority); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4d628ee..5069d2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -350,19 +350,19 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext, user); } } - + ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); // Verify and get the update application priority and set back to // submissionContext + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); Priority appPriority = scheduler.checkAndGetApplicationPriority( - submissionContext.getPriority(), user, submissionContext.getQueue(), + submissionContext.getPriority(), userUgi, submissionContext.getQueue(), applicationId); submissionContext.setPriority(appPriority); - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); // Since FairScheduler queue mapping is done inside scheduler, // if FairScheduler is used and the queue doesn't exist, we should not // fail here because queue will be created inside FS. Ideally, FS queue @@ -561,12 +561,14 @@ public void updateApplicationTimeout(RMApp app, /** * updateApplicationPriority will invoke scheduler api to update the * new priority to RM and StateStore. + * @param callerUGI user * @param applicationId Application Id * @param newAppPriority proposed new application priority * @throws YarnException Handle exceptions */ - public void updateApplicationPriority(ApplicationId applicationId, - Priority newAppPriority) throws YarnException { + public void updateApplicationPriority(UserGroupInformation callerUGI, + ApplicationId applicationId, Priority newAppPriority) + throws YarnException { RMApp app = this.rmContext.getRMApps().get(applicationId); synchronized (applicationId) { @@ -579,8 +581,8 @@ public void updateApplicationPriority(ApplicationId applicationId, // Invoke scheduler api to update priority in scheduler and to // State Store. - Priority appPriority = rmContext.getScheduler() - .updateApplicationPriority(newAppPriority, applicationId, future); + Priority appPriority = rmContext.getScheduler().updateApplicationPriority( + newAppPriority, applicationId, future, callerUGI); if (app.getApplicationPriority().equals(appPriority)) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c0cc6b0..17f4cd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -779,8 +780,8 @@ protected void refreshMaximumAllocation(Resource newMaxAlloc) { } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) + public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. @@ -789,8 +790,8 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) - throws YarnException { + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException { // Dummy Implementation till Application Priority changes are done in // specific scheduler. return Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index ea1ae60..608f275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -311,7 +311,7 @@ void setEntitlement(String queue, QueueEntitlement entitlement) * Verify whether a submitted application priority is valid as per configured * Queue * - * @param priorityFromContext + * @param priorityRequestedByApp * Submitted Application priority. * @param user * User who submitted the Application @@ -321,8 +321,8 @@ void setEntitlement(String queue, QueueEntitlement entitlement) * Application ID * @return Updated Priority from scheduler */ - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) + public Priority checkAndGetApplicationPriority(Priority priorityRequestedByApp, + UserGroupInformation user, String queueName, ApplicationId applicationId) throws YarnException; /** @@ -334,12 +334,13 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, * @param applicationId Application ID * * @param future Sets any type of exception happened from StateStore + * @param user who submitted the application * * @return updated priority */ public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) - throws YarnException; + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException; /** * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9a73a65..d39ca7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -224,6 +225,7 @@ public Configuration getConf() { private List asyncSchedulerThreads; private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; /** * EXPERT @@ -305,8 +307,9 @@ void initScheduler(Configuration configuration) throws this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); + this.appPriorityACLManager = new AppPriorityACLsManager(conf); this.queueManager = new CapacitySchedulerQueueManager(yarnConf, - this.labelManager); + this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); this.activitiesManager = new ActivitiesManager(rmContext); @@ -2180,45 +2183,57 @@ private String handleMoveToPlanQueue(String targetQueueName) { } @Override - public Priority checkAndGetApplicationPriority(Priority priorityFromContext, - String user, String queueName, ApplicationId applicationId) - throws YarnException { - Priority appPriority = null; - - // ToDo: Verify against priority ACLs + public Priority checkAndGetApplicationPriority( + Priority priorityRequestedByApp, UserGroupInformation user, + String queueName, ApplicationId applicationId) throws YarnException { + Priority appPriority = priorityRequestedByApp; // Verify the scenario where priority is null from submissionContext. - if (null == priorityFromContext) { + if (null == appPriority) { + // Verify whether submitted user has any default priority set. If so, + // user's default priority will get precedence over queue default. + // for updateApplicationPriority call flow, this check is done in + // CientRMService itself. + appPriority = this.appPriorityACLManager.getDefaultPriority(queueName, + user); + // Get the default priority for the Queue. If Queue is non-existent, then - // use default priority - priorityFromContext = this.queueManager.getDefaultPriorityForQueue( - queueName); + // use default priority. Do it only if user doesnt have any default. + if (null == appPriority) { + appPriority = this.queueManager.getDefaultPriorityForQueue(queueName); + } - LOG.info("Application '" + applicationId - + "' is submitted without priority " - + "hence considering default queue/cluster priority: " - + priorityFromContext.getPriority()); + LOG.info( + "Application '" + applicationId + "' is submitted without priority " + + "hence considering default queue/cluster priority: " + + appPriority.getPriority()); } // Verify whether submitted priority is lesser than max priority // in the cluster. If it is out of found, defining a max cap. - if (priorityFromContext.compareTo(getMaxClusterLevelAppPriority()) < 0) { - priorityFromContext = Priority + if (appPriority.compareTo(getMaxClusterLevelAppPriority()) < 0) { + appPriority = Priority .newInstance(getMaxClusterLevelAppPriority().getPriority()); } - appPriority = priorityFromContext; + // Lets check for ACLs here. + if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) { + throw new YarnException(new AccessControlException( + "User " + user + " does not have permission to submit/update " + + applicationId + " for " + appPriority)); + } - LOG.info("Priority '" + appPriority.getPriority() - + "' is acceptable in queue : " + queueName + " for application: " - + applicationId + " for the user: " + user); + LOG.info( + "Priority '" + appPriority.getPriority() + "' is acceptable in queue : " + + queueName + " for application: " + applicationId); return appPriority; } @Override public Priority updateApplicationPriority(Priority newPriority, - ApplicationId applicationId, SettableFuture future) + ApplicationId applicationId, SettableFuture future, + UserGroupInformation user) throws YarnException { Priority appPriority = null; SchedulerApplication application = applications @@ -2231,7 +2246,7 @@ public Priority updateApplicationPriority(Priority newPriority, RMApp rmApp = rmContext.getRMApps().get(applicationId); - appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), + appPriority = checkAndGetApplicationPriority(newPriority, user, rmApp.getQueue(), applicationId); if (application.getPriority().equals(appPriority)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfaeba4..4ed2155 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationACL; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PriorityACLConfiguration.PriorityACLConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -63,7 +65,7 @@ private static final Log LOG = LogFactory.getLog(CapacitySchedulerConfiguration.class); - + private static final String CS_CONFIGURATION_FILE = "capacity-scheduler.xml"; @Private @@ -274,6 +276,8 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + PriorityACLConfiguration priorityACLConfig = new PriorityACLConfiguration(); + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -602,6 +606,10 @@ private static String getAclKey(ReservationACL acl) { return "acl_" + StringUtils.toLowerCase(acl.toString()); } + private static String getAclKey(AccessType acl) { + return "acl_" + StringUtils.toLowerCase(acl.toString()); + } + @Override public Map getReservationAcls(String queue) { @@ -627,6 +635,11 @@ private void setAcl(String queue, ReservationACL acl, String aclString) { set(queuePrefix + getAclKey(acl), aclString); } + private void setAcl(String queue, AccessType acl, String aclString) { + String queuePrefix = getQueuePrefix(queue); + set(queuePrefix + getAclKey(acl), aclString); + } + public Map getAcls(String queue) { Map acls = new HashMap(); @@ -650,6 +663,34 @@ public void setReservationAcls(String queue, } } + @VisibleForTesting + public void setPriorityAcls(String queue, Priority priority, + Priority defaultPriority, String[] acls) { + StringBuilder aclString = new StringBuilder(); + + StringBuilder userAndGroup = new StringBuilder(); + for (int i = 0; i < acls.length; i++) { + userAndGroup.append(PriorityACLConfig.values()[i] + "=" + acls[i].trim()) + .append(" "); + } + + aclString.append("[" + userAndGroup.toString().trim() + " " + + "max_priority=" + priority.getPriority() + " " + "default_priority=" + + defaultPriority.getPriority() + "]"); + + setAcl(queue, AccessType.ACCESS_PRIORITY, aclString.toString()); + } + + public List getPriorityAcls(String queue, + Priority clusterMaxPriority) { + String queuePrefix = getQueuePrefix(queue); + String defaultAcl = ALL_ACL; + String aclString = get( + queuePrefix + getAclKey(AccessType.ACCESS_PRIORITY), defaultAcl); + + return priorityACLConfig.getPriorityAcl(clusterMaxPriority, aclString); + } + public String[] getQueues(String queue) { LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue)); String[] queues = getStrings(getQueuePrefix(queue) + QUEUES); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index c41a7bf..1a52eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -83,4 +84,6 @@ ResourceUsage getClusterResourceUsage(); ActivitiesManager getActivitiesManager(); + + Priority getMaxClusterLevelAppPriority(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7a6ce56..9d24904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; /** * @@ -85,16 +86,20 @@ public CSQueue hook(CSQueue queue) { private final Map queues = new ConcurrentHashMap<>(); private CSQueue root; private final RMNodeLabelsManager labelManager; + private AppPriorityACLsManager appPriorityACLManager; /** * Construct the service. * @param conf the configuration * @param labelManager the labelManager + * @param appPriorityACLManager Priority ACL Manager */ public CapacitySchedulerQueueManager(Configuration conf, - RMNodeLabelsManager labelManager) { + RMNodeLabelsManager labelManager, + AppPriorityACLsManager appPriorityACLManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; + this.appPriorityACLManager = appPriorityACLManager; } @Override @@ -140,7 +145,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); } @@ -162,7 +167,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) // Re-configure queues root.reinitialize(newRoot, this.csContext.getClusterResource()); - setQueueAcls(authorizer, queues); + setQueueAcls(authorizer, appPriorityACLManager, queues); // Re-calculate headroom for active applications Resource clusterResource = this.csContext.getClusterResource(); @@ -298,12 +303,20 @@ private void addNewQueues( * @throws IOException if fails to set queue acls */ public static void setQueueAcls(YarnAuthorizationProvider authorizer, - Map queues) throws IOException { + AppPriorityACLsManager appPriorityACLManager, Map queues) + throws IOException { List permissions = new ArrayList<>(); for (CSQueue queue : queues.values()) { AbstractCSQueue csQueue = (AbstractCSQueue) queue; permissions.add( new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); + + if (queue instanceof LeafQueue) { + LeafQueue lQueue = (LeafQueue) queue; + + appPriorityACLManager.addPrioirityACLs(lQueue.getPriorityACLs(), + lQueue.getQueueName()); + } } authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1c6471f..6524237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -139,6 +139,9 @@ private Map> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + List priorityAcls = + new ArrayList(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -204,6 +207,9 @@ protected void setupQueueConfigs(Resource clusterResource) conf.getMaximumApplicationMasterResourcePerQueuePercent( getQueuePath()); + priorityAcls = conf.getPriorityAcls(getQueuePath(), + scheduler.getMaxClusterLevelAppPriority()); + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, this.defaultLabelExpression, null)) { throw new IOException( @@ -496,6 +502,16 @@ private User getUserAndAddIfAbsent(String userName) { } } + @Private + public List getPriorityACLs() { + try { + readLock.lock(); + return priorityAcls; + } finally { + readLock.unlock(); + } + } + @Override public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java new file mode 100644 index 0000000..3d5f413 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLConfiguration.java @@ -0,0 +1,212 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Priority; + +public class PriorityACLConfiguration { + + private static final Log LOG = LogFactory + .getLog(PriorityACLConfiguration.class); + + public enum PriorityACLConfig { + USER(1), GROUP(2), MAX_PRIORITY(3), DEFAULT_PRIORITY(4); + + private final int id; + + PriorityACLConfig(int id) { + this.id = id; + } + + public int getId() { + return this.id; + } + } + + public static final String PATTERN_FOR_PRIORITY_ACL = "\\[([^\\]]+)"; + + @Private + public static final String ALL_ACL = "*"; + + @Private + public static final String NONE_ACL = " "; + + public List getPriorityAcl(Priority clusterMaxPriority, + String aclString) { + + List aclList = new ArrayList(); + Matcher matcher = Pattern.compile(PATTERN_FOR_PRIORITY_ACL) + .matcher(aclString); + + /* + * Each ACL group will be separated by "[]". Syntax of each ACL group could + * be like below "user=b1,b2 group=g1 max-priority=a2 default-priority=a1" + * Ideally this means "for this given user/group, maximum possible priority + * is a2 and if the user has not specified any priority, then it is a1." + */ + while (matcher.find()) { + // Get the first ACL sub-group. + String aclSubGroup = matcher.group(1); + if (aclSubGroup.trim().isEmpty()) { + continue; + } + + /* + * Internal storage is PriorityACLGroup which stores each parsed priority + * ACLs group. This will help while looking for a user to priority mapping + * during app submission time. ACLs will be passed in below order only. 1. + * user/group 2. max-priority 3. default-priority + */ + PriorityACLGroup userPriorityACL = new PriorityACLGroup(); + + // userAndGroupName will hold user acl and group acl as interim storage + // since both user/group acl comes with separate key value pairs. + List userAndGroupName = new ArrayList<>(); + + for (String kvPair : aclSubGroup.trim().split(" +")) { + /* + * There are 3 possible options for key here: 1. user/group 2. + * max-priority 3. default-priority + */ + String[] splits = kvPair.split("="); + + // Ensure that each ACL sub string is key value pair separated by '='. + if (splits != null && splits.length > 1) { + parsePriorityACLType(userPriorityACL, splits, userAndGroupName); + } + } + + // If max_priority is higher to clusterMaxPriority, its better to + // handle here. + if (userPriorityACL.getMaxPriority().getPriority() > clusterMaxPriority + .getPriority()) { + LOG.warn("ACL configuration for '" + userPriorityACL.getMaxPriority() + + "' is greater that cluster max priority. Resetting ACLs to " + + clusterMaxPriority); + userPriorityACL.setMaxPriority( + Priority.newInstance(clusterMaxPriority.getPriority())); + } + + AccessControlList acl = createACLStringForPriority(userAndGroupName); + userPriorityACL.setACLList(acl); + aclList.add(userPriorityACL); + } + + return aclList; + } + + /* + * Parse different types of ACLs sub parts for on priority group and store in + * a map for later processing. + */ + private void parsePriorityACLType(PriorityACLGroup userPriorityACL, + String[] splits, List userAndGroupName) { + // Here splits will have the key value pair at index 0 and 1 respectively. + // To parse all keys, its better to convert to PriorityACLConfig enum. + PriorityACLConfig aclType = PriorityACLConfig + .valueOf(StringUtils.toUpperCase(splits[0].trim())); + switch (aclType) { + case MAX_PRIORITY : + userPriorityACL + .setMaxPriority(Priority.newInstance(Integer.parseInt(splits[1]))); + break; + case USER : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case GROUP : + userAndGroupName.add(getUserOrGroupACLStringFromConfig(splits[1])); + break; + case DEFAULT_PRIORITY : + int defaultPriority = Integer.parseInt(splits[1]); + Priority priority = (defaultPriority < 0) + ? Priority.newInstance(0) + : Priority.newInstance(defaultPriority); + userPriorityACL.setDefaultPriority(priority); + break; + } + } + + /* + * This method will help to append different types of ACLs keys against one + * priority. For eg,USER will be appended with GROUP as "user2,user4 group1". + */ + private AccessControlList createACLStringForPriority( + List acls) { + + String finalACL = ""; + String userACL = acls.get(0).toString(); + + // If any of user/group is *, consider it as acceptable for all. + // "user" is at index 0, and "group" is at index 1. + if (userACL.trim().equals(ALL_ACL)) { + finalACL = ALL_ACL; + } else if (userACL.equals(NONE_ACL)) { + finalACL = NONE_ACL; + } else { + + // Get USER segment + if (!userACL.trim().isEmpty()) { + // skip last appended "," + finalACL = acls.get(0).toString(); + } + + // Get GROUP segment if any + if (acls.size() > 1) { + String groupACL = acls.get(1).toString(); + if (!groupACL.trim().isEmpty()) { + finalACL = finalACL + " " + + acls.get(1).toString(); + } + } + } + + // Here ACL will look like "user1,user2 group" in ideal cases. + return new AccessControlList(finalACL.trim()); + } + + /* + * This method will help to append user/group acl string against given + * priority. For example "user1,user2 group1,group2" + */ + private StringBuilder getUserOrGroupACLStringFromConfig(String value) { + + // ACL strings could be generate for USER or GRUOP. + // aclList in map contains two entries. 1. USER, 2. GROUP. + StringBuilder aclTypeName = new StringBuilder(); + + if (value.trim().equals(ALL_ACL)) { + aclTypeName.setLength(0); + aclTypeName.append(ALL_ACL); + return aclTypeName; + } + + aclTypeName.append(value.trim()); + return aclTypeName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java new file mode 100644 index 0000000..2c90f84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PriorityACLGroup.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; + +/** + * PriorityACLGroup will hold all ACL related information per priority. + * + */ +public class PriorityACLGroup implements Comparable { + + private Priority maxPriority = null; + private Priority defaultPriority = null; + private AccessControlList aclList = null; + + public PriorityACLGroup(Priority maxPriority, Priority defaultPriority, + AccessControlList aclList) { + this.setMaxPriority(Priority.newInstance(maxPriority.getPriority())); + this.setDefaultPriority( + Priority.newInstance(defaultPriority.getPriority())); + this.setACLList(aclList); + } + + public PriorityACLGroup() { + } + + @Override + public int compareTo(PriorityACLGroup o) { + return getMaxPriority().compareTo(o.getMaxPriority()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PriorityACLGroup other = (PriorityACLGroup) obj; + if (getMaxPriority() != other.getMaxPriority()) + return false; + if (getDefaultPriority() != other.getDefaultPriority()) + return false; + return true; + } + + @Override + public int hashCode() { + final int prime = 517861; + int result = 9511; + result = prime * result + getMaxPriority().getPriority(); + result = prime * result + getDefaultPriority().getPriority(); + return result; + } + + public Priority getMaxPriority() { + return maxPriority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public AccessControlList getACLList() { + return aclList; + } + + public void setMaxPriority(Priority maxPriority) { + this.maxPriority = maxPriority; + } + + public void setDefaultPriority(Priority defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setACLList(AccessControlList accessControlList) { + this.aclList = accessControlList; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java new file mode 100644 index 0000000..ffe98bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AppPriorityACLsManager.java @@ -0,0 +1,216 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.security; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PriorityACLGroup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class AppPriorityACLsManager { + + private static final Log LOG = LogFactory + .getLog(AppPriorityACLsManager.class); + + /* + * An internal class to store ACLs specific to each priority. This will be + * used to read and process acl's during app submission time as well. + */ + private static class PriorityACL { + private Priority priority; + private Priority defaultPriority; + private AccessControlList acl; + + PriorityACL(Priority priority, Priority defaultPriority, + AccessControlList acl) { + this.setPriority(priority); + this.setDefaultPriority(defaultPriority); + this.setAcl(acl); + } + + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } + + public Priority getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(Priority priority) { + this.defaultPriority = priority; + } + + public AccessControlList getAcl() { + return acl; + } + + public void setAcl(AccessControlList acl) { + this.acl = acl; + } + } + + private boolean isACLsEnable; + private final ConcurrentMap> allAcls = + new ConcurrentHashMap<>(); + + public AppPriorityACLsManager(Configuration conf) { + this.isACLsEnable = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, + YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); + } + + /** + * Each Queue could have configured with different priority acl's groups. This + * method helps to store each such ACL list against queue. + * + * @param priorityACLGroups + * List of Priority ACL Groups. + * @param queueName + * Queue Name associate with priority acl groups. + */ + public void addPrioirityACLs(List priorityACLGroups, + String queueName) { + + List priorityACL = allAcls.get(queueName); + if (null == priorityACL) { + priorityACL = new ArrayList(); + allAcls.put(queueName, priorityACL); + } + + // Ensure lowest priority PriorityACLGroup comes first in the list. + Collections.sort(priorityACLGroups); + + for (PriorityACLGroup priorityACLGroup : priorityACLGroups) { + priorityACL.add(new PriorityACL(priorityACLGroup.getMaxPriority(), + priorityACLGroup.getDefaultPriority(), + priorityACLGroup.getACLList())); + //if (LOG.isDebugEnabled()) { + LOG.info("Priority ACL group added: max-priority - " + + priorityACLGroup.getMaxPriority() + "default-priority - " + + priorityACLGroup.getDefaultPriority()); + // } + } + } + + /** + * Priority based checkAccess to ensure that given user has enough permission + * to submit application at a given priority level. + * + * @param callerUGI + * User who submits the application. + * @param queueName + * Queue to which application is submitted. + * @param submittedPriority + * priority of the application. + * @return True or False to indicate whether application can be submitted at + * submitted priority level or not. + */ + public boolean checkAccess(UserGroupInformation callerUGI, String queueName, + Priority submittedPriority) { + if (!isACLsEnable) { + return true; + } + + List acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return true; + } + + PriorityACL approvedPriorityACL = getPriorityPerUserACL(acls, + callerUGI, submittedPriority); + if (approvedPriorityACL == null) { + return false; + } + + return true; + } + + /** + * If an application is submitted without any priority, and submitted user has + * a default priority, this method helps to update this default priority as + * app's priority. + * + * @param queueName + * Submitted queue + * @param user + * User who submitted this application + * @return Default priority associated with given user. + */ + public Priority getDefaultPriority(String queueName, + UserGroupInformation user) { + if (!isACLsEnable) { + return null; + } + + List acls = allAcls.get(queueName); + if (acls == null || acls.isEmpty()) { + return null; + } + + PriorityACL approvedPriorityACL = getPriorityPerUserACL(acls, user, + null); + if (approvedPriorityACL == null) { + return null; + } + + Priority defaultPriority = Priority + .newInstance(approvedPriorityACL.getDefaultPriority().getPriority()); + return defaultPriority; + } + + private PriorityACL getPriorityPerUserACL(List acls , + UserGroupInformation user, Priority submittedPriority) { + + // Iterate through all configured ACLs starting from lower priority. + // If user is found corresponding to a configured priority, then store + // that entry. if failed, continue iterate through whole acl list. + PriorityACL selectedAcl = null; + for (PriorityACL entry : acls) { + AccessControlList list = entry.getAcl(); + + if (list.isUserAllowed(user)) { + selectedAcl = entry; + + // If submittedPriority is passed through the argument, also check + // whether submittedPriority is under max-priority of each ACL group. + if (submittedPriority != null) { + selectedAcl = null; + if (submittedPriority.getPriority() <= entry.getPriority() + .getPriority()) { + return entry; + } + } + } + } + return selectedAcl; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java index e661703..fbd5ac3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java @@ -43,6 +43,8 @@ protected static final String COMMON_USER = "common_user"; protected static final String QUEUE_A_USER = "queueA_user"; protected static final String QUEUE_B_USER = "queueB_user"; + protected static final String QUEUE_A_GROUP = "queueA_group"; + protected static final String QUEUE_B_GROUP = "queueB_group"; protected static final String ROOT_ADMIN = "root_admin"; protected static final String QUEUE_A_ADMIN = "queueA_admin"; protected static final String QUEUE_B_ADMIN = "queueB_admin"; @@ -53,7 +55,7 @@ protected static final Log LOG = LogFactory.getLog(TestApplicationACLs.class); - MockRM resourceManager; + protected MockRM resourceManager; Configuration conf; YarnRPC rpc; InetSocketAddress rmAddress; @@ -68,6 +70,7 @@ public void setup() throws InterruptedException, IOException { AccessControlList adminACL = new AccessControlList(""); conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString()); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); resourceManager = new MockRM(conf) { protected ClientRMService createClientRMService() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 00466ae..23bed22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -463,7 +464,9 @@ public void testPriorityInAllocatedResponse() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(), + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + rm.getRMAppManager().updateApplicationPriority(ugi, app1.getApplicationId(), appPriority2); AllocateResponse response2 = am1.allocate(allocateRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index cb57f39..79e481f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1120,7 +1120,7 @@ private static YarnScheduler mockYarnScheduler() throws YarnException { when(yarnScheduler.getResourceCalculator()).thenReturn(rs); when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class), - anyString(), anyString(), any(ApplicationId.class))) + any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) .thenReturn(Priority.newInstance(0)); return yarnScheduler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 164ca20..ff52efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -344,7 +345,10 @@ public void testUpdatePriorityAtRuntime() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -378,7 +382,10 @@ public void testUpdateInvalidPriorityAtRuntime() throws Exception { // Change the priority of App1 to 15 Priority appPriority2 = Priority.newInstance(15); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() @@ -428,7 +435,10 @@ public void testRMRestartWithChangeInPriority() throws Exception { // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); - cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app1.getUser()); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null, + ugi); // let things settle down Thread.sleep(1000); @@ -557,7 +567,10 @@ public void testApplicationPriorityAllocationWithChangeInPriority() // Change the priority of App1 to 3 (lowest) Priority appPriority3 = Priority.newInstance(3); - cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app2.getUser()); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null, + ugi); // add request for containers App2 am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList()); @@ -788,8 +801,10 @@ private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, int appsPendingExpected, int activeAppsExpected, RMApp app) throws YarnException { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(app.getUser()); cs.updateApplicationPriority(Priority.newInstance(2), - app.getApplicationId(), null); + app.getApplicationId(), null, ugi); SchedulerEvent removeAttempt; removeAttempt = new AppAttemptRemovedSchedulerEvent( app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java new file mode 100644 index 0000000..5eadeb1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLConfiguration.java @@ -0,0 +1,120 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLConfiguration { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = -1; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 10; + private final int clusterMaxPriority = 10; + + private static final String QUEUE_A_USER = "queueA_user"; + private static final String QUEUE_B_USER = "queueB_user"; + private static final String QUEUE_A_GROUP = "queueA_group"; + + private static final String QUEUEA = "queueA"; + private static final String QUEUEB = "queueB"; + private static final String QUEUEC = "queueC"; + + @Test + public void testSimpleACLConfiguration() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + // Try to get the ACL configs and make sure there are errors/exceptions + List pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + + // Validate! + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + } + + @Test + public void testACLConfigurationForInvalidCases() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + // Success case: Configure one user/group level priority acl for queue A. + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[1]; + aclsForB[0] = QUEUE_B_USER; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + // Try to get the ACL configs and make sure there are errors/exceptions + List pGroupA = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(clusterMaxPriority)); + List pGroupB = csConf.getPriorityAcls( + CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(clusterMaxPriority)); + + // Validate stored ACL values with configured ones. + verifyACLs(pGroupA, QUEUE_A_USER, QUEUE_A_GROUP, maxPriorityQueueA, + defaultPriorityQueueA); + verifyACLs(pGroupB, QUEUE_B_USER, "", maxPriorityQueueB, 0); + } + + private void verifyACLs(List pGroup, String queueUser, + String queueGroup, int maxPriority, int defaultPriority) { + PriorityACLGroup group = pGroup.get(0); + String aclString = queueUser + " " + queueGroup; + + Assert.assertEquals(aclString.trim(), + group.getACLList().getAclString().trim()); + Assert.assertEquals(maxPriority, group.getMaxPriority().getPriority()); + Assert.assertEquals(defaultPriority, + group.getDefaultPriority().getPriority()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java new file mode 100644 index 0000000..b41ba83 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriorityACLs.java @@ -0,0 +1,206 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ACLsTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + + +public class TestApplicationPriorityACLs extends ACLsTestBase { + + private final int defaultPriorityQueueA = 3; + private final int defaultPriorityQueueB = 10; + private final int maxPriorityQueueA = 5; + private final int maxPriorityQueueB = 11; + private final int clusterMaxPriority = 10; + + @Test + public void testApplicationACLs() throws Exception { + + /* + * Cluster Max-priority is 10. User 'queueA_user' has permission to submit + * apps only at priority 5. Default priority for this user is 3. + */ + + // Case 1: App will be submitted with priority 5. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, 5); + + // Case 2: App will be rejected as submitted priority was 6. + verifyAppSubmitWithPriorityFailure(QUEUE_A_USER, QUEUEA, 6); + + // Case 3: App will be submitted w/o priority, hence consider default 3. + verifyAppSubmitWithPrioritySuccess(QUEUE_A_USER, QUEUEA, -1); + + // Case 4: App will be submitted with priority 11. + verifyAppSubmitWithPrioritySuccess(QUEUE_B_USER, QUEUEB, 11); + } + + private void verifyAppSubmitWithPrioritySuccess(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = null; + if (priority > 0) { + appPriority = Priority.newInstance(priority); + } else { + // RM will consider default priority for the submitted user. So update + // priority to the default value to compare. + priority = defaultPriorityQueueA; + } + + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithValidAcl(submitter, submissionContext); + + // Ideally get app report here and check the priority. + verifyAppPriorityIsAccepted(submitter, submissionContext.getApplicationId(), + priority); + } + + private void verifyAppSubmitWithPriorityFailure(String submitter, + String queueName, int priority) throws Exception { + Priority appPriority = Priority.newInstance(priority); + ApplicationSubmissionContext submissionContext = prepareForAppSubmission( + submitter, queueName, appPriority); + submitAppToRMWithInValidAcl(submitter, submissionContext); + } + + private ApplicationSubmissionContext prepareForAppSubmission(String submitter, + String queueName, Priority priority) throws Exception { + + GetNewApplicationRequest newAppRequest = GetNewApplicationRequest + .newInstance(); + + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + ApplicationId applicationId = submitterClient + .getNewApplication(newAppRequest).getApplicationId(); + + Resource resource = BuilderUtils.newResource(1024, 1); + + ContainerLaunchContext amContainerSpec = ContainerLaunchContext + .newInstance(null, null, null, null, null, null); + ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext + .newInstance(applicationId, "applicationName", queueName, null, + amContainerSpec, false, true, 1, resource, "applicationType"); + appSubmissionContext.setApplicationId(applicationId); + appSubmissionContext.setQueue(queueName); + if (null != priority) { + appSubmissionContext.setPriority(priority); + } + + return appSubmissionContext; + } + + private void submitAppToRMWithValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + submitterClient.submitApplication(submitRequest); + resourceManager.waitForState(appSubmissionContext.getApplicationId(), + RMAppState.ACCEPTED); + } + + private void submitAppToRMWithInValidAcl(String submitter, + ApplicationSubmissionContext appSubmissionContext) + throws YarnException, IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + SubmitApplicationRequest submitRequest = SubmitApplicationRequest + .newInstance(appSubmissionContext); + try { + submitterClient.submitApplication(submitRequest); + } catch (YarnException ex) { + Assert.assertTrue(ex.getCause() instanceof RemoteException); + } + } + + private void verifyAppPriorityIsAccepted(String submitter, + ApplicationId applicationId, int priority) + throws IOException, InterruptedException { + ApplicationClientProtocol submitterClient = getRMClientForUser(submitter); + + /** + * If priority is greater than cluster max, RM will auto set to cluster max + * Consider this scenario as a special case. + */ + if (priority > clusterMaxPriority) { + priority = clusterMaxPriority; + } + + GetApplicationReportRequest request = GetApplicationReportRequest + .newInstance(applicationId); + try { + GetApplicationReportResponse response = submitterClient + .getApplicationReport(request); + Assert.assertEquals(response.getApplicationReport().getPriority(), + Priority.newInstance(priority)); + } catch (YarnException e) { + Assert.fail("Application submission should not fail."); + } + } + + @Override + protected Configuration createConfiguration() { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{QUEUEA, QUEUEB, QUEUEC}); + + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, 50f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, 25f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + QUEUEC, 25f); + + String[] aclsForA = new String[2]; + aclsForA[0] = QUEUE_A_USER; + aclsForA[1] = QUEUE_A_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEA, + Priority.newInstance(maxPriorityQueueA), + Priority.newInstance(defaultPriorityQueueA), aclsForA); + + String[] aclsForB = new String[2]; + aclsForB[0] = QUEUE_B_USER; + aclsForB[1] = QUEUE_B_GROUP; + csConf.setPriorityAcls(CapacitySchedulerConfiguration.ROOT + "." + QUEUEB, + Priority.newInstance(maxPriorityQueueB), + Priority.newInstance(defaultPriorityQueueB), aclsForB); + + csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + csConf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + + return csConf; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index a36db44..1348f51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -856,7 +857,10 @@ public void testQueueAcl() throws Exception { TestUtils.spyHook); YarnAuthorizationProvider authorizer = YarnAuthorizationProvider.getInstance(conf); - CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues); + AppPriorityACLsManager appPriorityACLManager = new AppPriorityACLsManager( + conf); + CapacitySchedulerQueueManager.setQueueAcls(authorizer, + appPriorityACLManager, queues); UserGroupInformation user = UserGroupInformation.getCurrentUser(); // Setup queue configs -- 2.7.4 (Apple Git-66)