diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ac1a26c..4c9307b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -135,6 +135,9 @@ private volatile long maxApplicationLifetime = -1; private volatile long defaultApplicationLifetime = -1; + private Map amResourceLimitPerUserPerPartition = + new HashMap(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -695,6 +698,7 @@ public Resource getUserAMResourceLimitPerPartition( */ float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); + float preWeightedUserLimit = effectiveUserLimit; effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f); Resource queuePartitionResource = getEffectiveCapacity(nodePartition); @@ -704,10 +708,32 @@ public Resource getUserAMResourceLimitPerPartition( queueCapacities.getMaxAMResourcePercentage(nodePartition) * effectiveUserLimit * usersManager.getUserLimitFactor(), minimumAllocation); - return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAMLimit = + Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? userAMLimit : getAMResourceLimitPerPartition(nodePartition); + + Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition) + * preWeightedUserLimit * usersManager.getUserLimitFactor(), + minimumAllocation); + preWeighteduserAMLimit = + Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + preWeighteduserAMLimit, + getAMResourceLimitPerPartition(nodePartition)) + ? preWeighteduserAMLimit + : getAMResourceLimitPerPartition(nodePartition); + amResourceLimitPerUserPerPartition.put(nodePartition, + preWeighteduserAMLimit); + + if (LOG.isDebugEnabled()) { + LOG.debug("Effective user AM limit for \"" + userName + "\":" + + preWeighteduserAMLimit + ". " + "Effective weighted user AM limit: " + + userAMLimit + ". User weight: " + userWeight); + } + return userAMLimit; } finally { readLock.unlock(); } @@ -2176,4 +2202,9 @@ public long getMaximumApplicationLifetime() { public long getDefaultApplicationLifetime() { return defaultApplicationLifetime; } + + public Map getAmResourceLimitPerUserPerPartition() { + return amResourceLimitPerUserPerPartition; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java index a1a8ecf..2e791db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.Map; + import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourcesInfo; +import org.apache.hadoop.yarn.util.resource.Resources; @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) @@ -39,12 +42,13 @@ protected ResourcesInfo resources; private float userWeight; private boolean isActive; + private Map amLimitPerUserPerPartition; UserInfo() {} UserInfo(String username, Resource resUsed, int activeApps, int pendingApps, Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage, - float weight, boolean isActive) { + float weight, boolean isActive, Map userAMLimits) { this.username = username; this.resourcesUsed = new ResourceInfo(resUsed); this.numActiveApplications = activeApps; @@ -54,6 +58,7 @@ this.resources = new ResourcesInfo(resourceUsage); this.userWeight = weight; this.isActive = isActive; + this.amLimitPerUserPerPartition = userAMLimits; } public String getUsername() { @@ -91,4 +96,38 @@ public float getUserWeight() { public boolean getIsActive() { return isActive; } + + /** + * This method retrieves the specified user's max AM resource value. If the + * user has a weight not equal to 1.0, the value may be an approximation of + * the max AM resource limit that is actually used by the scheduler. + * + * @param nodePartition The queue's node label + * @param shouldWeight If true, multiply the userAMLimit by the user's weight. + * @return The specified user's AM limit. Could be null + */ + public ResourceInfo getAmLimitPerUserPerPartition(String nodePartition, + boolean shouldWeight) { + ResourceInfo userAMLimitInfo = null; + ResourceInfo queueAMLimitInfo = + getResourceUsageInfo().getPartitionResourceUsageInfo( + (nodePartition == null) ? "" : nodePartition).getAMLimit(); + + // If the most recent per-user AM limit is not available, use the most + // recent Max AM Resource limit that was computed for this user. If this + // value is retrieved, there is no need to multiply it's weight. + if (amLimitPerUserPerPartition == null) { + userAMLimitInfo = queueAMLimitInfo; + } else { + Resource userAMLimit = + amLimitPerUserPerPartition.get( + (nodePartition == null) ? "" : nodePartition); + if (userAMLimit != null && shouldWeight) { + userAMLimitInfo = new ResourceInfo( + Resources.multiply(userAMLimit, userWeight)); + } + } + + return userAMLimitInfo; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 7287c5b..540ded8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -457,7 +457,8 @@ private void addUser(String userName, User user) { Resources.clone(user.getConsumedAMResources()), Resources.clone(user.getUserResourceLimit()), user.getResourceUsage(), user.getWeight(), - activeUsersSet.contains(user.userName))); + activeUsersSet.contains(user.userName), + lQueue.getAmResourceLimitPerUserPerPartition())); } return usersToReturn; } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index de85590..b7ad430 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -144,13 +144,14 @@ private void renderQueueCapacityInfo(ResponseInfo ri, String label) { // Get UserInfo from first user to calculate AM Resource Limit per user. ResourceInfo userAMResourceLimit = null; ArrayList usersList = lqinfo.getUsers().getUsersList(); - if (usersList.isEmpty()) { - // If no users are present, consider AM Limit for that queue. - userAMResourceLimit = resourceUsages.getAMLimit(); - } else { + if (!usersList.isEmpty()) { userAMResourceLimit = usersList.get(0) - .getResourceUsageInfo().getPartitionResourceUsageInfo(label) - .getAMLimit(); + .getAmLimitPerUserPerPartition(label, false); + } + // If no users are present or if AM limit per user doesn't exist, retrieve + // AM Limit for that queue. + if (userAMResourceLimit == null) { + userAMResourceLimit = resourceUsages.getAMLimit(); } ResourceInfo amUsed = (resourceUsages.getAmUsed() == null) ? new ResourceInfo(Resources.none()) @@ -238,8 +239,14 @@ protected void render(Block html) { ArrayList users = lqinfo.getUsers().getUsersList(); for (UserInfo userInfo : users) { ResourceInfo resourcesUsed = userInfo.getResourcesUsed(); - PartitionResourcesInfo resourceUsages = userInfo.getResourceUsageInfo() - .getPartitionResourceUsageInfo((nodeLabel == null) ? "" : nodeLabel); + ResourceInfo userAMLimitPerPartition = + userInfo.getAmLimitPerUserPerPartition(nodeLabel, true); + // If AM limit per user is null, use the AM limit for the queue level. + if (userAMLimitPerPartition == null) { + userAMLimitPerPartition = + lqinfo.getResources().getPartitionResourceUsageInfo(nodeLabel) + .getAMLimit(); + } if (nodeLabel != null) { resourcesUsed = userInfo.getResourceUsageInfo() .getPartitionResourceUsageInfo(nodeLabel).getUsed(); @@ -254,7 +261,7 @@ protected void render(Block html) { .td(userInfo.getUserResourceLimit().toString()) .td(String.valueOf(userInfo.getUserWeight())) .td(resourcesUsed.toString()) - .td(resourceUsages.getAMLimit().toString()) + .td(userAMLimitPerPartition.toString()) .td(amUsed.toString()) .td(Integer.toString(userInfo.getNumActiveApplications())) .td(Integer.toString(userInfo.getNumPendingApplications())).__();