diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 43ec390..8e9f6bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -103,11 +103,17 @@ @Private public static final String USER_LIMIT = "minimum-user-limit-percent"; - + @Private public static final String USER_LIMIT_FACTOR = "user-limit-factor"; @Private + public static final String USER_WEIGHT = "weight"; + + @Private + public static final float DEFAULT_USER_WEIGHT = 1.0f; + + @Private public static final String STATE = "state"; @Private @@ -1392,4 +1398,30 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation); } + + /** + * Get the weight of a user. Used in computing user-specific user limit, + * relative to other users. + * @param queuePath full queue path + * @param user user name to check for a specific weight + * @return user-specific weight, if it exists. Otherwise, return 1.0f + */ + public float getUserWeight(String queuePath, String user) { + // Walk the queue's hierarchy from the bottom up. Look for a weight that is + // specific to user at each level and stop when the value is not the + // default. If none exists for this user at any level of the queue path, + // use the default. + float userWeight = DEFAULT_USER_WEIGHT; + int offset = queuePath.length(); + do { + String qp = queuePath.substring(0, offset); + String weightKey = + getQueuePrefix(qp) + "user-settings." + user + "." + USER_WEIGHT; + userWeight = getFloat(weightKey, DEFAULT_USER_WEIGHT); + if (userWeight != DEFAULT_USER_WEIGHT) { + return userWeight; + } + } while ((offset = queuePath.lastIndexOf(".", offset-1)) > -1); + return userWeight; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1b20556..a738a9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -138,7 +138,7 @@ public LeafQueue(CapacitySchedulerContext cs, this.scheduler = cs; this.usersManager = new UsersManager(metrics, this, labelManager, scheduler, - resourceCalculator); + resourceCalculator, csContext); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -617,11 +617,21 @@ public Resource calculateAndGetAMResourceLimit() { @VisibleForTesting public Resource getUserAMResourceLimit() { - return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); + return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL, + null); } public Resource getUserAMResourceLimitPerPartition( String nodePartition) { + return getUserAMResourceLimitPerPartition(nodePartition, null); + } + + public Resource getUserAMResourceLimitPerPartition( + String nodePartition, String userName) { + int userLimit = getUserLimit(); + if (userName != null && getUser(userName) != null) { + userLimit *= getUser(userName).getUserWeight(); + } try { readLock.lock(); /* @@ -630,7 +640,7 @@ public Resource getUserAMResourceLimitPerPartition( * the absolute queue capacity (per partition) instead of the max and is * modified by the userlimit and the userlimit factor as is the userlimit */ - float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, + float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); Resource queuePartitionResource = Resources @@ -772,7 +782,8 @@ private void activateApplications() { // Verify whether we already calculated user-am-limit for this label. if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAMLimit = getUserAMResourceLimitPerPartition(partitionName, + application.getUser()); userAmPartitionLimit.put(partitionName, userAMLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java index ff9d304..cfd7504 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java @@ -37,11 +37,14 @@ protected ResourceInfo AMResourceUsed; protected ResourceInfo userResourceLimit; protected ResourcesInfo resources; + protected float userWeight; + protected boolean isActive; UserInfo() {} UserInfo(String username, Resource resUsed, int activeApps, int pendingApps, - Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) { + Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage, + float weight, boolean isActive) { this.username = username; this.resourcesUsed = new ResourceInfo(resUsed); this.numActiveApplications = activeApps; @@ -49,6 +52,8 @@ this.AMResourceUsed = new ResourceInfo(amResUsed); this.userResourceLimit = new ResourceInfo(resourceLimit); this.resources = new ResourcesInfo(resourceUsage); + this.userWeight = weight; + this.isActive = isActive; } public String getUsername() { @@ -78,4 +83,12 @@ public ResourceInfo getUserResourceLimit() { public ResourcesInfo getResourceUsageInfo() { return resources; } + + public float getUserWeight() { + return userWeight; + } + + public boolean getIsActive() { + return isActive; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index c2134eb..3aac43a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -72,10 +72,6 @@ // To detect whether there is a change in user count for every user-limit // calculation. private AtomicLong latestVersionOfUsersState = new AtomicLong(0); - private Map> localVersionOfActiveUsersState = - new HashMap>(); - private Map> localVersionOfAllUsersState = - new HashMap>(); private volatile int userLimit; private volatile float userLimitFactor; @@ -88,9 +84,10 @@ private Map> usersApplications = new HashMap>(); - // Pre-computed list of user-limits. - Map> preComputedActiveUserLimit = new ConcurrentHashMap<>(); - Map> preComputedAllUserLimit = new ConcurrentHashMap<>(); + private int sumActiveUsersMinimumUserLimitPercent; + private int sumAllUsersMinimumUserLimitPercent; + + private CapacitySchedulerContext csContext; /** * UsageRatios will store the total used resources ratio across all users of @@ -159,6 +156,22 @@ private void setUsageRatio(String label, float ratio) { private UsageRatios userUsageRatios = new UsageRatios(); private WriteLock writeLock; + // User limits are kept per user, per partition, and per scheduling mode. + // Store this user's state and user limit, calculated relative to active users. + private Map> activeUserLimit = + new ConcurrentHashMap<>(); + private Map> activeUsersState = + new HashMap>(); + + // Store this user's state and user limit calculated relative to all users. + private Map> allUserLimit = + new ConcurrentHashMap<>(); + private Map> allUsersState = + new HashMap>(); + + // User-specific weight for computing user-specific minimum user limit pct. + float weight; + public User(String name) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // Nobody uses read-lock now, will add it when necessary @@ -253,6 +266,32 @@ public Resource getUserResourceLimit() { public void setUserResourceLimit(Resource userResourceLimit) { this.userResourceLimit = userResourceLimit; } + + public Map> + getActiveUserLimit() { + return activeUserLimit; + } + + public Map> + getAllUserLimit() { + return allUserLimit; + } + + public Map> getActiveUsersState() { + return activeUsersState; + } + + public Map> getAllUsersState() { + return allUsersState; + } + + public void setUserWeight(float userWeight) { + weight = userWeight; + } + + public float getUserWeight() { + return weight; + } } /* End of User class */ /** @@ -268,10 +307,13 @@ public void setUserResourceLimit(Resource userResourceLimit) { * Capacity Scheduler Context * @param resourceCalculator * rc + * @param ctxt + * capacity scheduler context */ public UsersManager(QueueMetrics metrics, LeafQueue lQueue, RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler, - ResourceCalculator resourceCalculator) { + ResourceCalculator resourceCalculator, + CapacitySchedulerContext ctxt) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.lQueue = lQueue; this.scheduler = scheduler; @@ -279,6 +321,7 @@ public UsersManager(QueueMetrics metrics, LeafQueue lQueue, this.resourceCalculator = resourceCalculator; this.qUsageRatios = new UsageRatios(); this.metrics = metrics; + this.csContext = ctxt; this.writeLock = lock.writeLock(); this.readLock = lock.readLock(); @@ -373,6 +416,8 @@ public void removeUser(String userName) { // Remove user from active/non-active list as well. activeUsersSet.remove(userName); nonActiveUsersSet.remove(userName); + computeSumActiveUsersMinimumUserLimitPercent(); + computeSumAllUsersMinimumUserLimitPercent(); } finally { writeLock.unlock(); } @@ -409,6 +454,31 @@ public User getUserAndAddIfAbsent(String userName) { */ private void addUser(String userName, User user) { this.users.put(userName, user); + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + user.setUserWeight(conf.getUserWeight(lQueue.getQueuePath(), userName)); + computeSumAllUsersMinimumUserLimitPercent(); + } + + // Add up the weighted minimum user limit percents for each user. + // Since each user can have its own weight, the MULP sum may not necessarily + // equal 100%. This will be normalized to 100% when computing an individual + // user's limit resource. + // Compute this only when a user is added or removed so that it doesn't need + // to be done in the computeUserLimit thread. + private void computeSumAllUsersMinimumUserLimitPercent() { + sumAllUsersMinimumUserLimitPercent = 0; + for (User u : users.values()) { + sumAllUsersMinimumUserLimitPercent += + Math.round(u.getUserWeight() * getUserLimit()); + } + } + + private void computeSumActiveUsersMinimumUserLimitPercent() { + sumActiveUsersMinimumUserLimitPercent = 0; + for (String userName : activeUsersSet) { + sumActiveUsersMinimumUserLimitPercent += + Math.round(getUser(userName).getUserWeight() * getUserLimit()); + } } /** @@ -425,7 +495,8 @@ private void addUser(String userName, User user) { user.getActiveApplications(), user.getPendingApplications(), Resources.clone(user.getConsumedAMResources()), Resources.clone(user.getUserResourceLimit()), - user.getResourceUsage())); + user.getResourceUsage(), user.getUserWeight(), + activeUsersSet.contains(user.userName))); } return usersToReturn; } finally { @@ -452,20 +523,25 @@ private void addUser(String userName, User user) { public Resource getComputedResourceLimitForActiveUsers(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { + if (getUser(userName) == null + || getUser(userName).getActiveUserLimit() == null) { + return Resource.newInstance(0, 0); + } - Map userLimitPerSchedulingMode = preComputedActiveUserLimit - .get(nodePartition); + Map userLimitPerSchedulingMode = + getUser(userName).getActiveUserLimit().get(nodePartition); try { writeLock.lock(); - if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { + if (isRecomputeNeeded(userName, schedulingMode, nodePartition, true)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, clusterResource, schedulingMode, true); // update user count to cache so that we can avoid recompute if no major // changes. - setLocalVersionOfUsersState(nodePartition, schedulingMode, true); + setLocalVersionOfUsersState(getUser(userName), nodePartition, + schedulingMode, true); } } finally { writeLock.unlock(); @@ -499,20 +575,25 @@ public Resource getComputedResourceLimitForActiveUsers(String userName, public Resource getComputedResourceLimitForAllUsers(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { + if (getUser(userName) == null + || getUser(userName).getActiveUserLimit() == null) { + return Resource.newInstance(0, 0); + } - Map userLimitPerSchedulingMode = preComputedAllUserLimit - .get(nodePartition); + User user = getUser(userName); + Map userLimitPerSchedulingMode = + user.getAllUserLimit().get(nodePartition); try { writeLock.lock(); - if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { + if (isRecomputeNeeded(userName, schedulingMode, nodePartition, false)) { // recompute userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, clusterResource, schedulingMode, false); // update user count to cache so that we can avoid recompute if no major // changes. - setLocalVersionOfUsersState(nodePartition, schedulingMode, false); + setLocalVersionOfUsersState(getUser(userName), nodePartition, schedulingMode, false); } } finally { writeLock.unlock(); @@ -532,22 +613,22 @@ public Resource getComputedResourceLimitForAllUsers(String userName, * not exist in local map. 2. Total User count doesn't match with local cached * version. */ - private boolean isRecomputeNeeded(SchedulingMode schedulingMode, - String nodePartition, boolean isActive) { - return (getLocalVersionOfUsersState(nodePartition, schedulingMode, - isActive) != latestVersionOfUsersState.get()); + private boolean isRecomputeNeeded(String userName, + SchedulingMode schedulingMode, String nodePartition, boolean isActive) { + return (getLocalVersionOfUsersState(getUser(userName), nodePartition, + schedulingMode, isActive) != latestVersionOfUsersState.get()); } /* * Set Local version of user count per label to invalidate cache if needed. */ - private void setLocalVersionOfUsersState(String nodePartition, + private void setLocalVersionOfUsersState(User user, String nodePartition, SchedulingMode schedulingMode, boolean isActive) { try { writeLock.lock(); Map> localVersionOfUsersState = (isActive) - ? localVersionOfActiveUsersState - : localVersionOfAllUsersState; + ? user.getActiveUsersState() + : user.getAllUsersState(); Map localVersion = localVersionOfUsersState .get(nodePartition); @@ -565,13 +646,13 @@ private void setLocalVersionOfUsersState(String nodePartition, /* * Get Local version of user count per label to invalidate cache if needed. */ - private long getLocalVersionOfUsersState(String nodePartition, + private long getLocalVersionOfUsersState(User user, String nodePartition, SchedulingMode schedulingMode, boolean isActive) { try { this.readLock.lock(); Map> localVersionOfUsersState = (isActive) - ? localVersionOfActiveUsersState - : localVersionOfAllUsersState; + ? user.getActiveUsersState() + : user.getAllUsersState(); if (!localVersionOfUsersState.containsKey(nodePartition)) { return -1; @@ -593,11 +674,12 @@ private long getLocalVersionOfUsersState(String nodePartition, String nodePartition, Resource clusterResource, SchedulingMode schedulingMode, boolean activeMode) { + User user = getUser(userName); // preselect stored map as per active user-limit or all user computation. Map> computedMap = null; computedMap = (activeMode) - ? preComputedActiveUserLimit - : preComputedAllUserLimit; + ? user.getActiveUserLimit() + : user.getAllUserLimit(); Map userLimitPerSchedulingMode = computedMap .get(nodePartition); @@ -647,16 +729,19 @@ private Resource computeUserLimit(String userName, Resource clusterResource, queueCapacity, required); /* - * We want to base the userLimit calculation on max(queueCapacity, - * usedResources+required). However, we want usedResources to be based on - * the combined ratios of all the users in the queue so we use consumedRatio - * to calculate such. The calculation is dependent on how the - * resourceCalculator calculates the ratio between two Resources. DRF - * Example: If usedResources is greater than queueCapacity and users have - * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is - * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio - * is then 20+30=50%. Yes, this value can be larger than 100% but for the - * purposes of making sure all users are getting their fair share, it works. + * We want to base the userLimit calculation on + * max(queueCapacity, usedResources+required). However, we want + * usedResources to be based on the combined ratios of all the users in the + * queue so we use consumedRatio to calculate such. + * The calculation is dependent on how the resourceCalculator calculates the + * ratio between two Resources. DRF Example: If usedResources is greater + * than queueCapacity and users have the following [mem,cpu] usages: + * + * User1: [10%,20%] - Dominant resource is 20% + * User2: [30%,10%] - Dominant resource is 30% + * Then total consumedRatio is then 20+30=50%. Yes, this value can be + * larger than 100% but for the purposes of making sure all users are + * getting their fair share, it works. */ Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, partitionResource, getUsageRatio(nodePartition), @@ -674,22 +759,43 @@ private Resource computeUserLimit(String userName, Resource clusterResource, int usersCount = getNumActiveUsers(); Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition); + int sumUsersMinimumUserLimitPercent = sumActiveUsersMinimumUserLimitPercent; + // For non-activeUser calculation, consider all users count. if (!activeUser) { resourceUsed = currentCapacity; usersCount = users.size(); + sumUsersMinimumUserLimitPercent = sumAllUsersMinimumUserLimitPercent; + } + + // Since each user's MULP can be different, first make sure that every + // user's MULP has been met. Then, if any percentage is left over, divide + // that evenly among all users. + // The calculation is: + // (current user's user-limit-percent) + + // ( (100 - min(sum(each user's user-limit-percent), 100)) + // / #users ) + User user = getUser(userName); + float localUserLimit = Math.round(user.getUserWeight() * getUserLimit()); + if (usersCount > 0) { + sumUsersMinimumUserLimitPercent = + Math.min(sumUsersMinimumUserLimitPercent, 100); + localUserLimit += ((100-sumUsersMinimumUserLimitPercent) / usersCount); } /* - * User limit resource is determined by: max{currentCapacity / #activeUsers, - * currentCapacity * user-limit-percentage%) + * User limit resource is determined by: + * max{% of active resources due to user, + * currentCapacity * user-limit-percentage%) */ Resource userLimitResource = Resources.max(resourceCalculator, partitionResource, - Resources.divideAndCeil(resourceCalculator, resourceUsed, - usersCount), Resources.divideAndCeil(resourceCalculator, - Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()), + Resources.multiplyAndRoundDown(resourceUsed, localUserLimit), + 100), + Resources.divideAndCeil(resourceCalculator, + Resources.multiplyAndRoundDown(currentCapacity, + user.getUserWeight() * getUserLimit()), 100)); // User limit is capped by maxUserLimit @@ -718,18 +824,28 @@ private Resource computeUserLimit(String userName, Resource clusterResource, lQueue.getMinimumAllocation()); if (LOG.isDebugEnabled()) { - LOG.debug("User limit computation for " + userName + " in queue " - + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit() - + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: " - + required + " consumed: " + consumed + " user-limit-resource: " - + userLimitResource + " queueCapacity: " + queueCapacity - + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed() - + " currentCapacity: " + currentCapacity + " activeUsers: " - + usersCount + " clusterCapacity: " + clusterResource - + " resourceByLabel: " + partitionResource + " usageratio: " - + getUsageRatio(nodePartition) + " Partition: " + nodePartition); + LOG.debug("User limit computation for " + userName + + " in queue: " + lQueue.getQueueName() + + " userLimitPercent=" + lQueue.getUserLimit() + + " userLimitFactor=" + lQueue.getUserLimitFactor() + + " required=" + required + + " consumed=" + consumed + + " user-limit-resource=" + userLimitResource + + " queueCapacity=" + queueCapacity + + " qconsumed=" + lQueue.getQueueResourceUsage().getUsed() + + " currentCapacity=" + currentCapacity + + " activeUsers=" + usersCount + + " clusterCapacity=" + clusterResource + + " resourceByLabel=" + partitionResource + + " usageratio=" + getUsageRatio(nodePartition) + + " Partition=" + nodePartition + + " userWeight=" + user.getUserWeight() + + " resourceUsed=" + resourceUsed + + " maxUserLimit=" + maxUserLimit + ); } getUser(userName).setUserResourceLimit(userLimitResource); + return userLimitResource; } @@ -841,6 +957,7 @@ private void updateActiveUsersResourceUsage(String userName) { if (nonActiveUsersSet.contains(userName)) { nonActiveUsersSet.remove(userName); activeUsersSet.add(userName); + computeSumActiveUsersMinimumUserLimitPercent(); // Update total resource usage of active and non-active after user // is moved from non-active to active. @@ -881,6 +998,7 @@ private void updateNonActiveUsersResourceUsage(String userName) { if (activeUsersSet.contains(userName)) { activeUsersSet.remove(userName); nonActiveUsersSet.add(userName); + computeSumActiveUsersMinimumUserLimitPercent(); // Update total resource usage of active and non-active after user is // moved from active to non-active. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index fea29bb..31fed84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -880,8 +880,8 @@ protected void getPendingAppDiagnosticMessage( .append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName)); diagnosticMessage.append("; "); diagnosticMessage.append("User AM Resource Limit of the queue = "); - diagnosticMessage.append( - queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName)); + diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition( + appAMNodePartitionName, getUser())); diagnosticMessage.append("; "); diagnosticMessage.append("Queue AM Resource Usage = "); diagnosticMessage.append( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index b972428..dc6e8b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -68,6 +68,7 @@ "left:0%;background:none;border:1px dashed #BFBFBF"; static final String Q_OVER = "background:#FFA333"; static final String Q_UNDER = "background:#5BD75B"; + static final String ACTIVE_USER = "background:#FFFF00"; @RequestScoped static class CSQInfo { @@ -209,6 +210,7 @@ protected void render(Block html) { html.table("#userinfo").thead().$class("ui-widget-header").tr().th() .$class("ui-state-default")._("User Name")._().th() .$class("ui-state-default")._("Max Resource")._().th() + .$class("ui-state-default")._("Weight")._().th() .$class("ui-state-default")._("Used Resource")._().th() .$class("ui-state-default")._("Max AM Resource")._().th() .$class("ui-state-default")._("Used AM Resource")._().th() @@ -229,8 +231,11 @@ protected void render(Block html) { ResourceInfo amUsed = (resourceUsages.getAmUsed() == null) ? new ResourceInfo(Resources.none()) : resourceUsages.getAmUsed(); - tbody.tr().td(userInfo.getUsername()) + String highlightIfAsking = + userInfo.getIsActive() ? ACTIVE_USER : null; + tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername()) .td(userInfo.getUserResourceLimit().toString()) + .td(String.valueOf(userInfo.getUserWeight())) .td(resourcesUsed.toString()) .td(resourceUsages.getAMLimit().toString()) .td(amUsed.toString()) @@ -399,6 +404,8 @@ public void render(Block html) { _("Used (over capacity)")._(). span().$class("qlegend ui-corner-all ui-state-default"). _("Max Capacity")._(). + span().$class("qlegend ui-corner-all").$style(ACTIVE_USER). + _("Users Requesting Resources")._(). _(); float used = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3fbbae3..067f6ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -954,7 +954,113 @@ public void testUserLimits() throws Exception { // app_0 doesn't have outstanding resources, there's only one active user. assertEquals("There should only be 1 active user!", 1, a.getAbstractUsersManager().getNumActiveUsers()); + } + + @Test + public void testUserSpecificUserLimits() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Set user-limit + a.setUserLimit(50); + csConf.setUserLimit(a.getQueuePath(), 50); + csConf.setFloat("yarn.scheduler.capacity.root.user-settings.user_0." + + CapacitySchedulerConfiguration.USER_WEIGHT, 1.5f); + a.setUserLimitFactor(2); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(16 * GB, 32)); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_1); // different user + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + // app_0 asks for 3 3-GB containers + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 3, true, + priority, recordFactory))); + + // app_1 asks for 2 1-GB containers + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + + /** + * Start testing... + */ + + // There're two active users + assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers()); + + // 1 container to user_0 + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(3*GB, a.getUsedResources().getMemorySize()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // Since user_0's minimum-user-limit-percent is 75%, and since user_0's app + // is first in the queue, app_0 gets the next container as well. + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(6*GB, a.getUsedResources().getMemorySize()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // Now that user_0 is above its MULP, the next container should go to user_1 + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(7*GB, a.getUsedResources().getMemorySize()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); + + assertEquals(3*GB, + app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize()); + + assertEquals(1*GB, + app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -1243,7 +1349,11 @@ public void testHeadroomWithMaxCap() throws Exception { LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); //unset maxCapacity a.setMaxCapacity(1.0f); - + + // Set user-limit + a.setUserLimit(50); + a.setUserLimitFactor(2); + // Users final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -1299,10 +1409,6 @@ public void testHeadroomWithMaxCap() throws Exception { /** * Start testing... */ - - // Set user-limit - a.setUserLimit(50); - a.setUserLimitFactor(2); // Now, only user_0 should be active since he is the only one with // outstanding requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md index 737bdc2..06c06f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md @@ -124,6 +124,7 @@ Configuration | `yarn.scheduler.capacity..user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. | | `yarn.scheduler.capacity..maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. | | `yarn.scheduler.capacity..maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. | +| `yarn.scheduler.capacity..user-settings..weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. This value is inherited by subqueues of . | * Running and Pending Application Limits