diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 43ec390..8e9f6bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -103,11 +103,17 @@ @Private public static final String USER_LIMIT = "minimum-user-limit-percent"; - + @Private public static final String USER_LIMIT_FACTOR = "user-limit-factor"; @Private + public static final String USER_WEIGHT = "weight"; + + @Private + public static final float DEFAULT_USER_WEIGHT = 1.0f; + + @Private public static final String STATE = "state"; @Private @@ -1392,4 +1398,30 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation); } + + /** + * Get the weight of a user. Used in computing user-specific user limit, + * relative to other users. + * @param queuePath full queue path + * @param user user name to check for a specific weight + * @return user-specific weight, if it exists. Otherwise, return 1.0f + */ + public float getUserWeight(String queuePath, String user) { + // Walk the queue's hierarchy from the bottom up. Look for a weight that is + // specific to user at each level and stop when the value is not the + // default. If none exists for this user at any level of the queue path, + // use the default. + float userWeight = DEFAULT_USER_WEIGHT; + int offset = queuePath.length(); + do { + String qp = queuePath.substring(0, offset); + String weightKey = + getQueuePrefix(qp) + "user-settings." + user + "." + USER_WEIGHT; + userWeight = getFloat(weightKey, DEFAULT_USER_WEIGHT); + if (userWeight != DEFAULT_USER_WEIGHT) { + return userWeight; + } + } while ((offset = queuePath.lastIndexOf(".", offset-1)) > -1); + return userWeight; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 8cae6c3..21dff22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -153,6 +153,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) setQueueAcls(authorizer, appPriorityACLManager, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); this.queueStateManager.initialize(this); + updateUserWeights(conf, queues); LOG.info("Initialized root queue " + root); } @@ -182,6 +183,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) labelManager.reinitializeQueueLabels(getQueueToLabels()); this.queueStateManager.initialize(this); + updateUserWeights(newConf, queues); } /** @@ -404,4 +406,19 @@ public Priority getDefaultPriorityForQueue(String queueName) { getQueueStateManager() { return this.queueStateManager; } + + /** + * Update weights for users currently in leaf queues. + * @param conf the CapacitySchedulerConfiguration + * @param queues the queues + */ + private static void updateUserWeights(CapacitySchedulerConfiguration conf, + Map queues) { + for (CSQueue queue : queues.values()) { + if (queue instanceof LeafQueue) { + LeafQueue lQueue = (LeafQueue) queue; + lQueue.getUsersManager().updateUserWeights(conf); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1b20556..d89c12f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -617,11 +617,21 @@ public Resource calculateAndGetAMResourceLimit() { @VisibleForTesting public Resource getUserAMResourceLimit() { - return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); + return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL, + null); } public Resource getUserAMResourceLimitPerPartition( String nodePartition) { + return getUserAMResourceLimitPerPartition(nodePartition, null); + } + + public Resource getUserAMResourceLimitPerPartition( + String nodePartition, String userName) { + int userLimit = getUserLimit(); + if (userName != null && getUser(userName) != null) { + userLimit *= getUser(userName).getUserWeight(); + } try { readLock.lock(); /* @@ -630,7 +640,7 @@ public Resource getUserAMResourceLimitPerPartition( * the absolute queue capacity (per partition) instead of the max and is * modified by the userlimit and the userlimit factor as is the userlimit */ - float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, + float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); Resource queuePartitionResource = Resources @@ -772,7 +782,8 @@ private void activateApplications() { // Verify whether we already calculated user-am-limit for this label. if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAMLimit = getUserAMResourceLimitPerPartition(partitionName, + application.getUser()); userAmPartitionLimit.put(partitionName, userAMLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java index ff9d304..a1a8ecf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java @@ -37,11 +37,14 @@ protected ResourceInfo AMResourceUsed; protected ResourceInfo userResourceLimit; protected ResourcesInfo resources; + private float userWeight; + private boolean isActive; UserInfo() {} UserInfo(String username, Resource resUsed, int activeApps, int pendingApps, - Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) { + Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage, + float weight, boolean isActive) { this.username = username; this.resourcesUsed = new ResourceInfo(resUsed); this.numActiveApplications = activeApps; @@ -49,6 +52,8 @@ this.AMResourceUsed = new ResourceInfo(amResUsed); this.userResourceLimit = new ResourceInfo(resourceLimit); this.resources = new ResourcesInfo(resourceUsage); + this.userWeight = weight; + this.isActive = isActive; } public String getUsername() { @@ -78,4 +83,12 @@ public ResourceInfo getUserResourceLimit() { public ResourcesInfo getResourceUsageInfo() { return resources; } + + public float getUserWeight() { + return userWeight; + } + + public boolean getIsActive() { + return isActive; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index c2134eb..383bcf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -72,10 +73,6 @@ // To detect whether there is a change in user count for every user-limit // calculation. private AtomicLong latestVersionOfUsersState = new AtomicLong(0); - private Map> localVersionOfActiveUsersState = - new HashMap>(); - private Map> localVersionOfAllUsersState = - new HashMap>(); private volatile int userLimit; private volatile float userLimitFactor; @@ -88,9 +85,8 @@ private Map> usersApplications = new HashMap>(); - // Pre-computed list of user-limits. - Map> preComputedActiveUserLimit = new ConcurrentHashMap<>(); - Map> preComputedAllUserLimit = new ConcurrentHashMap<>(); + private int sumActiveUsersMinimumUserLimitPercent; + private int sumAllUsersMinimumUserLimitPercent; /** * UsageRatios will store the total used resources ratio across all users of @@ -159,6 +155,23 @@ private void setUsageRatio(String label, float ratio) { private UsageRatios userUsageRatios = new UsageRatios(); private WriteLock writeLock; + // User limits are kept per user, per partition, and per scheduling mode. + // Store this user's state and user limit, calculated relative to active + // users. + private Map> activeUserLimit = + new ConcurrentHashMap<>(); + private Map> activeUsersState = + new HashMap>(); + + // Store this user's state and user limit calculated relative to all users. + private Map> allUserLimit = + new ConcurrentHashMap<>(); + private Map> allUsersState = + new HashMap>(); + + // User-specific weight for computing user-specific minimum user limit pct. + private float weight; + public User(String name) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // Nobody uses read-lock now, will add it when necessary @@ -253,6 +266,32 @@ public Resource getUserResourceLimit() { public void setUserResourceLimit(Resource userResourceLimit) { this.userResourceLimit = userResourceLimit; } + + public Map> + getActiveUserLimit() { + return activeUserLimit; + } + + public Map> + getAllUserLimit() { + return allUserLimit; + } + + public Map> getActiveUsersState() { + return activeUsersState; + } + + public Map> getAllUsersState() { + return allUsersState; + } + + public void setUserWeight(float userWeight) { + weight = userWeight; + } + + public float getUserWeight() { + return weight; + } } /* End of User class */ /** @@ -268,6 +307,8 @@ public void setUserResourceLimit(Resource userResourceLimit) { * Capacity Scheduler Context * @param resourceCalculator * rc + * @param ctxt + * capacity scheduler context */ public UsersManager(QueueMetrics metrics, LeafQueue lQueue, RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler, @@ -373,6 +414,8 @@ public void removeUser(String userName) { // Remove user from active/non-active list as well. activeUsersSet.remove(userName); nonActiveUsersSet.remove(userName); + computeSumActiveUsersMinimumUserLimitPercent(); + computeSumAllUsersMinimumUserLimitPercent(); } finally { writeLock.unlock(); } @@ -409,6 +452,31 @@ public User getUserAndAddIfAbsent(String userName) { */ private void addUser(String userName, User user) { this.users.put(userName, user); + CapacitySchedulerConfiguration conf = scheduler.getConfiguration(); + user.setUserWeight(conf.getUserWeight(lQueue.getQueuePath(), userName)); + computeSumAllUsersMinimumUserLimitPercent(); + } + + // Add up the weighted minimum user limit percents for each user. + // Since each user can have its own weight, the MULP sum may not necessarily + // equal 100%. This will be normalized to 100% when computing an individual + // user's limit resource. + // Compute this only when a user is added or removed so that it doesn't need + // to be done in the computeUserLimit thread. + private void computeSumAllUsersMinimumUserLimitPercent() { + sumAllUsersMinimumUserLimitPercent = 0; + for (User u : users.values()) { + sumAllUsersMinimumUserLimitPercent += + Math.round(u.getUserWeight() * getUserLimit()); + } + } + + private void computeSumActiveUsersMinimumUserLimitPercent() { + sumActiveUsersMinimumUserLimitPercent = 0; + for (String userName : activeUsersSet) { + sumActiveUsersMinimumUserLimitPercent += + Math.round(getUser(userName).getUserWeight() * getUserLimit()); + } } /** @@ -425,7 +493,8 @@ private void addUser(String userName, User user) { user.getActiveApplications(), user.getPendingApplications(), Resources.clone(user.getConsumedAMResources()), Resources.clone(user.getUserResourceLimit()), - user.getResourceUsage())); + user.getResourceUsage(), user.getUserWeight(), + activeUsersSet.contains(user.userName))); } return usersToReturn; } finally { @@ -452,20 +521,25 @@ private void addUser(String userName, User user) { public Resource getComputedResourceLimitForActiveUsers(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { + if (getUser(userName) == null + || getUser(userName).getActiveUserLimit() == null) { + return Resource.newInstance(0, 0); + } - Map userLimitPerSchedulingMode = preComputedActiveUserLimit - .get(nodePartition); + Map userLimitPerSchedulingMode = + getUser(userName).getActiveUserLimit().get(nodePartition); try { writeLock.lock(); - if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { + if (isRecomputeNeeded(userName, schedulingMode, nodePartition, true)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, clusterResource, schedulingMode, true); // update user count to cache so that we can avoid recompute if no major // changes. - setLocalVersionOfUsersState(nodePartition, schedulingMode, true); + setLocalVersionOfUsersState(getUser(userName), nodePartition, + schedulingMode, true); } } finally { writeLock.unlock(); @@ -499,20 +573,26 @@ public Resource getComputedResourceLimitForActiveUsers(String userName, public Resource getComputedResourceLimitForAllUsers(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { + if (getUser(userName) == null + || getUser(userName).getActiveUserLimit() == null) { + return Resource.newInstance(0, 0); + } - Map userLimitPerSchedulingMode = preComputedAllUserLimit - .get(nodePartition); + User user = getUser(userName); + Map userLimitPerSchedulingMode = + user.getAllUserLimit().get(nodePartition); try { writeLock.lock(); - if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { + if (isRecomputeNeeded(userName, schedulingMode, nodePartition, false)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, clusterResource, schedulingMode, false); // update user count to cache so that we can avoid recompute if no major // changes. - setLocalVersionOfUsersState(nodePartition, schedulingMode, false); + setLocalVersionOfUsersState(getUser(userName), nodePartition, + schedulingMode, false); } } finally { writeLock.unlock(); @@ -532,22 +612,22 @@ public Resource getComputedResourceLimitForAllUsers(String userName, * not exist in local map. 2. Total User count doesn't match with local cached * version. */ - private boolean isRecomputeNeeded(SchedulingMode schedulingMode, - String nodePartition, boolean isActive) { - return (getLocalVersionOfUsersState(nodePartition, schedulingMode, - isActive) != latestVersionOfUsersState.get()); + private boolean isRecomputeNeeded(String userName, + SchedulingMode schedulingMode, String nodePartition, boolean isActive) { + return (getLocalVersionOfUsersState(getUser(userName), nodePartition, + schedulingMode, isActive) != latestVersionOfUsersState.get()); } /* * Set Local version of user count per label to invalidate cache if needed. */ - private void setLocalVersionOfUsersState(String nodePartition, + private void setLocalVersionOfUsersState(User user, String nodePartition, SchedulingMode schedulingMode, boolean isActive) { try { writeLock.lock(); Map> localVersionOfUsersState = (isActive) - ? localVersionOfActiveUsersState - : localVersionOfAllUsersState; + ? user.getActiveUsersState() + : user.getAllUsersState(); Map localVersion = localVersionOfUsersState .get(nodePartition); @@ -565,13 +645,13 @@ private void setLocalVersionOfUsersState(String nodePartition, /* * Get Local version of user count per label to invalidate cache if needed. */ - private long getLocalVersionOfUsersState(String nodePartition, + private long getLocalVersionOfUsersState(User user, String nodePartition, SchedulingMode schedulingMode, boolean isActive) { try { this.readLock.lock(); Map> localVersionOfUsersState = (isActive) - ? localVersionOfActiveUsersState - : localVersionOfAllUsersState; + ? user.getActiveUsersState() + : user.getAllUsersState(); if (!localVersionOfUsersState.containsKey(nodePartition)) { return -1; @@ -593,11 +673,12 @@ private long getLocalVersionOfUsersState(String nodePartition, String nodePartition, Resource clusterResource, SchedulingMode schedulingMode, boolean activeMode) { + User user = getUser(userName); // preselect stored map as per active user-limit or all user computation. Map> computedMap = null; computedMap = (activeMode) - ? preComputedActiveUserLimit - : preComputedAllUserLimit; + ? user.getActiveUserLimit() + : user.getAllUserLimit(); Map userLimitPerSchedulingMode = computedMap .get(nodePartition); @@ -647,16 +728,19 @@ private Resource computeUserLimit(String userName, Resource clusterResource, queueCapacity, required); /* - * We want to base the userLimit calculation on max(queueCapacity, - * usedResources+required). However, we want usedResources to be based on - * the combined ratios of all the users in the queue so we use consumedRatio - * to calculate such. The calculation is dependent on how the - * resourceCalculator calculates the ratio between two Resources. DRF - * Example: If usedResources is greater than queueCapacity and users have - * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is - * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio - * is then 20+30=50%. Yes, this value can be larger than 100% but for the - * purposes of making sure all users are getting their fair share, it works. + * We want to base the userLimit calculation on + * max(queueCapacity, usedResources+required). However, we want + * usedResources to be based on the combined ratios of all the users in the + * queue so we use consumedRatio to calculate such. + * The calculation is dependent on how the resourceCalculator calculates the + * ratio between two Resources. DRF Example: If usedResources is greater + * than queueCapacity and users have the following [mem,cpu] usages: + * + * User1: [10%,20%] - Dominant resource is 20% + * User2: [30%,10%] - Dominant resource is 30% + * Then total consumedRatio is then 20+30=50%. Yes, this value can be + * larger than 100% but for the purposes of making sure all users are + * getting their fair share, it works. */ Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, partitionResource, getUsageRatio(nodePartition), @@ -674,22 +758,43 @@ private Resource computeUserLimit(String userName, Resource clusterResource, int usersCount = getNumActiveUsers(); Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition); + int sumUsersMinimumUserLimitPercent = sumActiveUsersMinimumUserLimitPercent; + // For non-activeUser calculation, consider all users count. if (!activeUser) { resourceUsed = currentCapacity; usersCount = users.size(); + sumUsersMinimumUserLimitPercent = sumAllUsersMinimumUserLimitPercent; + } + + // Since each user's MULP can be different, first make sure that every + // user's MULP has been met. Then, if any percentage is left over, divide + // that evenly among all users. + // The calculation is: + // (current user's user-limit-percent) + + // ( (100 - min(sum(each user's user-limit-percent), 100)) + // / #users ) + User user = getUser(userName); + int localUserLimit = Math.round(user.getUserWeight() * getUserLimit()); + if (usersCount > 0) { + sumUsersMinimumUserLimitPercent = + Math.min(sumUsersMinimumUserLimitPercent, 100); + localUserLimit += (100-sumUsersMinimumUserLimitPercent) / usersCount; } /* - * User limit resource is determined by: max{currentCapacity / #activeUsers, - * currentCapacity * user-limit-percentage%) + * User limit resource is determined by: + * max{% of active resources due to user, + * currentCapacity * user-limit-percentage%) */ Resource userLimitResource = Resources.max(resourceCalculator, partitionResource, - Resources.divideAndCeil(resourceCalculator, resourceUsed, - usersCount), Resources.divideAndCeil(resourceCalculator, - Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()), + Resources.multiplyAndRoundDown(resourceUsed, localUserLimit), + 100), + Resources.divideAndCeil(resourceCalculator, + Resources.multiplyAndRoundDown(currentCapacity, + user.getUserWeight() * getUserLimit()), 100)); // User limit is capped by maxUserLimit @@ -717,19 +822,31 @@ private Resource computeUserLimit(String userName, Resource clusterResource, userLimitResource, maxUserLimit), lQueue.getMinimumAllocation()); - if (LOG.isDebugEnabled()) { - LOG.debug("User limit computation for " + userName + " in queue " - + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit() - + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: " - + required + " consumed: " + consumed + " user-limit-resource: " - + userLimitResource + " queueCapacity: " + queueCapacity - + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed() - + " currentCapacity: " + currentCapacity + " activeUsers: " - + usersCount + " clusterCapacity: " + clusterResource - + " resourceByLabel: " + partitionResource + " usageratio: " - + getUsageRatio(nodePartition) + " Partition: " + nodePartition); - } +// TODO: if (LOG.isDebugEnabled()) { + LOG.info("\nUser limit computation for " + userName + + ", in queue: " + lQueue.getQueueName() + + ", userLimitPercent=" + lQueue.getUserLimit() + + ", userLimitFactor=" + lQueue.getUserLimitFactor() + + ", required=" + required + + ", consumed=" + consumed + + ", user-limit-resource=" + userLimitResource + + ", queueCapacity=" + queueCapacity + + ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed() + + ", currentCapacity=" + currentCapacity + + ", activeUsers=" + usersCount + + ", clusterCapacity=" + clusterResource + + ", resourceByLabel=" + partitionResource + + ", usageratio=" + getUsageRatio(nodePartition) + + ", Partition=" + nodePartition + + ", userWeight=" + user.getUserWeight() + + ", resourceUsed=" + resourceUsed + + ", maxUserLimit=" + maxUserLimit + + ", sumUsersMinimumUserLimitPercent=" + sumUsersMinimumUserLimitPercent + + ", localUserLimit=" + localUserLimit + ); +// } getUser(userName).setUserResourceLimit(userLimitResource); + return userLimitResource; } @@ -841,6 +958,7 @@ private void updateActiveUsersResourceUsage(String userName) { if (nonActiveUsersSet.contains(userName)) { nonActiveUsersSet.remove(userName); activeUsersSet.add(userName); + computeSumActiveUsersMinimumUserLimitPercent(); // Update total resource usage of active and non-active after user // is moved from non-active to active. @@ -881,6 +999,7 @@ private void updateNonActiveUsersResourceUsage(String userName) { if (activeUsersSet.contains(userName)) { activeUsersSet.remove(userName); nonActiveUsersSet.add(userName); + computeSumActiveUsersMinimumUserLimitPercent(); // Update total resource usage of active and non-active after user is // moved from active to non-active. @@ -981,4 +1100,19 @@ private void updateResourceUsagePerUser(User user, Resource resource, + totalResUsageForNonActiveUsers.getAllUsed()); } } -} + + void updateUserWeights(CapacitySchedulerConfiguration conf) { + try { + this.writeLock.lock(); + for (Entry u : users.entrySet()) { + u.getValue().setUserWeight( + conf.getUserWeight(lQueue.getQueuePath(), u.getKey())); + } + } finally { + this.writeLock.unlock(); + } + userLimitNeedsRecompute(); + computeSumActiveUsersMinimumUserLimitPercent(); + computeSumAllUsersMinimumUserLimitPercent(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index fea29bb..31fed84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -880,8 +880,8 @@ protected void getPendingAppDiagnosticMessage( .append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName)); diagnosticMessage.append("; "); diagnosticMessage.append("User AM Resource Limit of the queue = "); - diagnosticMessage.append( - queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName)); + diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition( + appAMNodePartitionName, getUser())); diagnosticMessage.append("; "); diagnosticMessage.append("Queue AM Resource Usage = "); diagnosticMessage.append( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index b972428..dc6e8b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -68,6 +68,7 @@ "left:0%;background:none;border:1px dashed #BFBFBF"; static final String Q_OVER = "background:#FFA333"; static final String Q_UNDER = "background:#5BD75B"; + static final String ACTIVE_USER = "background:#FFFF00"; @RequestScoped static class CSQInfo { @@ -209,6 +210,7 @@ protected void render(Block html) { html.table("#userinfo").thead().$class("ui-widget-header").tr().th() .$class("ui-state-default")._("User Name")._().th() .$class("ui-state-default")._("Max Resource")._().th() + .$class("ui-state-default")._("Weight")._().th() .$class("ui-state-default")._("Used Resource")._().th() .$class("ui-state-default")._("Max AM Resource")._().th() .$class("ui-state-default")._("Used AM Resource")._().th() @@ -229,8 +231,11 @@ protected void render(Block html) { ResourceInfo amUsed = (resourceUsages.getAmUsed() == null) ? new ResourceInfo(Resources.none()) : resourceUsages.getAmUsed(); - tbody.tr().td(userInfo.getUsername()) + String highlightIfAsking = + userInfo.getIsActive() ? ACTIVE_USER : null; + tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername()) .td(userInfo.getUserResourceLimit().toString()) + .td(String.valueOf(userInfo.getUserWeight())) .td(resourcesUsed.toString()) .td(resourceUsages.getAMLimit().toString()) .td(amUsed.toString()) @@ -399,6 +404,8 @@ public void render(Block html) { _("Used (over capacity)")._(). span().$class("qlegend ui-corner-all ui-state-default"). _("Max Capacity")._(). + span().$class("qlegend ui-corner-all").$style(ACTIVE_USER). + _("Users Requesting Resources")._(). _(); float used = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3fbbae3..e591a8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -884,9 +884,11 @@ public void testUserLimits() throws Exception { // Setup some nodes String host_0 = "127.0.0.1"; - FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "127.0.0.2"; - FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = + TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = @@ -954,7 +956,113 @@ public void testUserLimits() throws Exception { // app_0 doesn't have outstanding resources, there's only one active user. assertEquals("There should only be 1 active user!", 1, a.getAbstractUsersManager().getNumActiveUsers()); + } + + @Test + public void testUserSpecificUserLimits() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Set user-limit + a.setUserLimit(50); + csConf.setUserLimit(a.getQueuePath(), 50); + csConf.setFloat("yarn.scheduler.capacity.root.user-settings.user_0." + + CapacitySchedulerConfiguration.USER_WEIGHT, 1.5f); + a.setUserLimitFactor(2); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(16 * GB, 32)); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_1); // different user + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + // app_0 asks for 3 3-GB containers + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 3, true, + priority, recordFactory))); + + // app_1 asks for 2 1-GB containers + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + + /** + * Start testing... + */ + + // There're two active users + assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers()); + + // 1 container to user_0 + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(3*GB, a.getUsedResources().getMemorySize()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // Since user_0's minimum-user-limit-percent is 75%, and since user_0's app + // is first in the queue, app_0 gets the next container as well. + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(6*GB, a.getUsedResources().getMemorySize()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // Now that user_0 is above its MULP, the next container should go to user_1 + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(7*GB, a.getUsedResources().getMemorySize()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(3*GB, + app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize()); + + assertEquals(1*GB, + app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -1243,7 +1351,11 @@ public void testHeadroomWithMaxCap() throws Exception { LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); //unset maxCapacity a.setMaxCapacity(1.0f); - + + // Set user-limit + a.setUserLimit(50); + a.setUserLimitFactor(2); + // Users final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -1299,10 +1411,6 @@ public void testHeadroomWithMaxCap() throws Exception { /** * Start testing... */ - - // Set user-limit - a.setUserLimit(50); - a.setUserLimitFactor(2); // Now, only user_0 should be active since he is the only one with // outstanding requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md index 737bdc2..903e83d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md @@ -123,7 +123,7 @@ Configuration | `yarn.scheduler.capacity..minimum-user-limit-percent` | Each queue enforces a limit on the percentage of resources allocated to a user at any given time, if there is demand for resources. The user limit can vary between a minimum and maximum value. The former (the minimum value) is set to this property value and the latter (the maximum value) depends on the number of users who have submitted applications. For e.g., suppose the value of this property is 25. If two users have submitted applications to a queue, no single user can use more than 50% of the queue resources. If a third user submits an application, no single user can use more than 33% of the queue resources. With 4 or more users, no user can use more than 25% of the queues resources. A value of 100 implies no user limits are imposed. The default is 100. Value is specified as a integer. | | `yarn.scheduler.capacity..user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. | | `yarn.scheduler.capacity..maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. | -| `yarn.scheduler.capacity..maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. | +| `yarn.scheduler.capacity..user-settings..weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. This value is inherited by subqueues of queue-path. | * Running and Pending Application Limits