From 2bbb02111517373e276143cca6fa59732a71d646 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Tue, 10 Jan 2017 16:53:18 +0530 Subject: [PATCH] YARN-5889 --- .../resourcemanager/UserToPartitionRecord.java | 86 ++++ .../capacity/FifoIntraQueuePreemptionPlugin.java | 6 +- .../scheduler/ActiveUsersManager.java | 52 ++- .../resourcemanager/scheduler/UsersManager.java | 461 +++++++++++++++++++++ .../capacity/CapacityHeadroomProvider.java | 5 +- .../scheduler/capacity/LeafQueue.java | 367 ++++------------ ...ionalCapacityPreemptionPolicyMockFramework.java | 10 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 3 +- .../scheduler/capacity/TestLeafQueue.java | 2 +- 9 files changed, 704 insertions(+), 288 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/UsersManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java new file mode 100644 index 0000000..a2b6344 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +public class UserToPartitionRecord + implements + Comparable { + + private String userName; + private String partition; + + public static UserToPartitionRecord newInstance(String userName, + String partition) { + UserToPartitionRecord userRecord = new UserToPartitionRecord(); + userRecord.setPartition(partition); + userRecord.setUserName(userName); + return userRecord; + } + + public String getPartition() { + return partition; + } + + public String getUserName() { + return userName; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + @Override + public int compareTo(UserToPartitionRecord o) { + int result = this.getUserName().compareTo(o.getUserName()); + if (result == 0) { + return this.getPartition().compareTo(o.getPartition()); + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UserToPartitionRecord other = (UserToPartitionRecord) obj; + if (getUserName() != other.getUserName() + || getPartition() != other.getPartition()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + final int prime = 493217; + int result = 8501; + result = prime * result + this.getUserName().hashCode(); + result = prime * result + this.getPartition().hashCode(); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 757f567..a313465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -264,8 +265,9 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Verify whether we already calculated headroom for this user. if (userLimitResource == null) { - userLimitResource = Resources.clone(tq.leafQueue - .getUserLimitPerUser(userName, partitionBasedResource, partition)); + userLimitResource = Resources.clone(tq.leafQueue.getComputedUserLimit( + userName, clusterResource, tq.leafQueue.getUser(userName), + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); Resource amUsed = perUserAMUsed.get(userName); if (null == amUsed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e6858..8b4c2c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -45,7 +47,9 @@ private int activeUsers = 0; private Map> usersApplications = new HashMap>(); - + private boolean userAddedOrRemoved = false; + private ResourceUsage totalResUsedByActiveUsers = new ResourceUsage(); + public ActiveUsersManager(QueueMetrics metrics) { this.metrics = metrics; } @@ -65,6 +69,7 @@ synchronized public void activateApplication( usersApplications.put(user, userApps); ++activeUsers; metrics.incrActiveUsers(); + resetUserAddedOrRemoved(true); LOG.debug("User " + user + " added to activeUsers, currently: " + activeUsers); } @@ -91,12 +96,13 @@ synchronized public void deactivateApplication( usersApplications.remove(user); --activeUsers; metrics.decrActiveUsers(); + resetUserAddedOrRemoved(true); LOG.debug("User " + user + " removed from activeUsers, currently: " + activeUsers); } } } - + /** * Get number of active users i.e. users with applications which have pending * resource requests. @@ -106,4 +112,46 @@ synchronized public void deactivateApplication( synchronized public int getNumActiveUsers() { return activeUsers; } + + /** + * Get active users i.e. users with applications which have pending + * resource requests. + * @return number of active users + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public Collection getActiveUsers() { + return Collections.unmodifiableCollection(usersApplications.keySet()); + } + + /** + * + * @param user + * User Name + * @return whether given user is active or not. + */ + @Lock({Queue.class}) + synchronized public boolean isAnActiveUser(String user) { + return usersApplications.containsKey(user); + } + + /** + * Check whether any user has became active or non-active. + * @return Return true/false. + */ + public boolean isUserAddedOrRemoved() { + return userAddedOrRemoved; + } + + /** + * + * @param reset + */ + @Lock({Queue.class}) + public void resetUserAddedOrRemoved(boolean reset) { + this.userAddedOrRemoved = reset; + } + + public ResourceUsage getTotalResUsedByActiveUsers() { + return totalResUsedByActiveUsers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/UsersManager.java new file mode 100644 index 0000000..8ae46bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/UsersManager.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.UserToPartitionRecord; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.UsageRatios; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link UsersManager} tracks users in the system. + */ +@Private +public class UsersManager { + + private static final Log LOG = LogFactory.getLog(UsersManager.class); + + @VisibleForTesting + public static class User { + ResourceUsage userResourceUsage = new ResourceUsage(); + ActiveUsersManager activeUsersManager = null; + String userName = null; + volatile Resource userResourceLimit = Resource.newInstance(0, 0); + volatile int pendingApplications = 0; + volatile int activeApplications = 0; + volatile int cachedULCount = 0; + private UsageRatios userUsageRatios = new UsageRatios(); + private WriteLock writeLock; + + public User(ActiveUsersManager activeUsersManager, String name) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Nobody uses read-lock now, will add it when necessary + writeLock = lock.writeLock(); + this.activeUsersManager = activeUsersManager; + this.userName = name; + } + + public ResourceUsage getResourceUsage() { + return userResourceUsage; + } + + public float resetAndUpdateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } finally { + writeLock.unlock(); + } + } + + public float updateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + float delta; + float newRatio = Resources.ratio(resourceCalculator, + getUsed(nodePartition), resource); + delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); + userUsageRatios.setUsageRatio(nodePartition, newRatio); + return delta; + } finally { + writeLock.unlock(); + } + } + + public Resource getUsed() { + return userResourceUsage.getUsed(); + } + + public Resource getAllUsed() { + return userResourceUsage.getAllUsed(); + } + + public Resource getUsed(String label) { + return userResourceUsage.getUsed(label); + } + + public int getPendingApplications() { + return pendingApplications; + } + + public int getActiveApplications() { + return activeApplications; + } + + public Resource getConsumedAMResources() { + return userResourceUsage.getAMUsed(); + } + + public Resource getConsumedAMResources(String label) { + return userResourceUsage.getAMUsed(label); + } + + public int getTotalApplications() { + return getPendingApplications() + getActiveApplications(); + } + + public void submitApplication() { + try { + writeLock.lock(); + ++pendingApplications; + } finally { + writeLock.unlock(); + } + } + + public void activateApplication() { + try { + writeLock.lock(); + --pendingApplications; + ++activeApplications; + } finally { + writeLock.unlock(); + } + } + + public void finishApplication(boolean wasActive) { + try { + writeLock.lock(); + if (wasActive) { + --activeApplications; + } else { + --pendingApplications; + } + } finally { + writeLock.unlock(); + } + } + + public void assignContainer(Resource resource, String nodePartition) { + userResourceUsage.incUsed(nodePartition, resource); + + // Add to active user's total resource as well. + if (activeUsersManager.isAnActiveUser(userName)) { + activeUsersManager.getTotalResUsedByActiveUsers().incUsed(nodePartition, + resource); + } + } + + public void releaseContainer(Resource resource, String nodePartition) { + userResourceUsage.decUsed(nodePartition, resource); + + // Decrement from active user's total resource as well. + if (activeUsersManager.isAnActiveUser(userName)) { + activeUsersManager.getTotalResUsedByActiveUsers().decUsed(nodePartition, + resource); + } + } + + public Resource getUserResourceLimit() { + return userResourceLimit; + } + + public void setUserResourceLimit(Resource userResourceLimit) { + this.userResourceLimit = userResourceLimit; + } + + public void setCachedULCount(int ulCount) { + try { + writeLock.lock(); + this.cachedULCount = ulCount; + } finally { + writeLock.unlock(); + } + } + + public int getCachedULCount() { + return cachedULCount; + } + } + + private final LeafQueue lQueue; + private ActiveUsersManager activeUsersManager = null; + final RMNodeLabelsManager labelManager; + + // Pre-computed list of user-limits. + Map> + preComputedActiveUserLimit = new ConcurrentHashMap<>(); + Map> + preComputedAllUserLimit = new ConcurrentHashMap<>(); + + public UsersManager(QueueMetrics metrics, LeafQueue lQueue, + RMNodeLabelsManager labelManager) { + this.activeUsersManager = new ActiveUsersManager(metrics); + this.lQueue = lQueue; + this.labelManager = labelManager; + } + + public ActiveUsersManager getActiveUsersManager() { + return activeUsersManager; + } + + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedActiveUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode, ResourceCalculator rc) { + + // In allocation thread, cross check whether user became active or not. + // Such a change needs recomputation for all user-limits. + if (activeUsersManager.isUserAddedOrRemoved()) { + // Clear computed user-limits and ask for recomputation. + lQueue.setRecalculateULCount(lQueue.getRecalculateULCount() + 1); + + // reset the flag so that we can further track for active users. + activeUsersManager.resetUserAddedOrRemoved(false); + } + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = preComputedActiveUserLimit + .get(userRecord); + + if (userLimitPerSchedulingMode == null + || user.getCachedULCount() != lQueue.getRecalculateULCount()) { + userLimitPerSchedulingMode = reComputeUserLimits(rc, userName, + nodePartition, clusterResource, true); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode, ResourceCalculator rc) { + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = preComputedAllUserLimit + .get(userRecord); + + if (userLimitPerSchedulingMode == null + || user.getCachedULCount() != lQueue.getRecalculateULCount()) { + userLimitPerSchedulingMode = reComputeUserLimits(rc, userName, + nodePartition, clusterResource, false); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + private Map reComputeUserLimits( + ResourceCalculator rc, String userName, String nodePartition, + Resource clusterResource, boolean activeMode) { + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map> computedMap = null; + + // preselect stored map as per active user-limit or all user computation. + if (activeMode == true) { + computedMap = preComputedActiveUserLimit; + } else { + computedMap = preComputedAllUserLimit; + } + + Map userLimitPerSchedulingMode = computedMap + .get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = new ConcurrentHashMap<>(); + computedMap.put(userRecord, userLimitPerSchedulingMode); + } + + userLimitPerSchedulingMode.clear(); + + // compute user-limit per scheduling mode. + Resource userLimitWithExclusivity = computeUserLimit(rc, userName, + clusterResource, lQueue.getUser(userName), nodePartition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, activeMode); + Resource userLimitWithOutExclusivity = computeUserLimit(rc, userName, + clusterResource, lQueue.getUser(userName), nodePartition, + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY, activeMode); + + // update in local storage + userLimitPerSchedulingMode.put(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + userLimitWithExclusivity); + userLimitPerSchedulingMode.put(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY, + userLimitWithOutExclusivity); + + return userLimitPerSchedulingMode; + } + + @Lock(NoLock.class) + private Resource computeUserLimit(ResourceCalculator resourceCalculator, + String userName, Resource clusterResource, User user, + String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { + Resource partitionResource = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // What is our current capacity? + // * It is equal to the max(required, queue-capacity) if + // we're running below capacity. The 'max' ensures that jobs in queues + // with miniscule capacity (< 1 slot) make progress + // * If we're running over capacity, then its + // (usedResources + required) (which extra resources we are allocating) + Resource queueCapacity = Resources.multiplyAndNormalizeUp( + resourceCalculator, partitionResource, + lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition), + lQueue.getMinimumAllocation()); + + // Assume we have required resource equals to minimumAllocation, this can + // make sure user limit can continuously increase till queueMaxResource + // reached. + Resource required = lQueue.getMinimumAllocation(); + + // Allow progress for queues with miniscule capacity + queueCapacity = Resources.max(resourceCalculator, partitionResource, + queueCapacity, required); + + /* + * We want to base the userLimit calculation on max(queueCapacity, + * usedResources+required). However, we want usedResources to be based on + * the combined ratios of all the users in the queue so we use consumedRatio + * to calculate such. The calculation is dependent on how the + * resourceCalculator calculates the ratio between two Resources. DRF + * Example: If usedResources is greater than queueCapacity and users have + * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is + * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio + * is then 20+30=50%. Yes, this value can be larger than 100% but for the + * purposes of making sure all users are getting their fair share, it works. + */ + Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, + partitionResource, lQueue.getUsageRatio(nodePartition), + lQueue.getMinimumAllocation()); + Resource currentCapacity = Resources.lessThan(resourceCalculator, + partitionResource, consumed, queueCapacity) + ? queueCapacity + : Resources.add(consumed, required); + // Never allow a single user to take more than the + // queue's configured capacity * user-limit-factor. + // Also, the queue's configured capacity should be higher than + // queue-hard-limit * ulMin + + Resource resourceUsed = Resources.createResource(0, 0); + int activeUsersCount = activeUsersManager.getNumActiveUsers(); + + // For activeUser calculation, consider only active users resource used. + if (activeUser) { + resourceUsed = activeUsersManager.getTotalResUsedByActiveUsers() + .getUsed(nodePartition); + } else { + resourceUsed = currentCapacity; + } + + // User limit resource is determined by: + // max{currentCapacity / #activeUsers, currentCapacity * + // user-limit-percentage%) + Resource userLimitResource = Resources.max(resourceCalculator, + partitionResource, + Resources.divideAndCeil(resourceCalculator, resourceUsed, + activeUsersCount), + Resources.divideAndCeil(resourceCalculator, Resources + .multiplyAndRoundDown(currentCapacity, lQueue.getUserLimit()), + 100)); + + // User limit is capped by maxUserLimit + // - maxUserLimit = queueCapacity * user-limit-factor + // (RESPECT_PARTITION_EXCLUSIVITY) + // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) + // + // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a + // partition, its guaranteed resource on that partition is 0. And + // user-limit-factor computation is based on queue's guaranteed capacity. So + // we will not cap user-limit as well as used resource when doing + // IGNORE_PARTITION_EXCLUSIVITY allocation. + Resource maxUserLimit = Resources.none(); + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, + lQueue.getUserLimitFactor()); + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + maxUserLimit = partitionResource; + } + + // Cap final user limit with maxUserLimit + userLimitResource = Resources + .roundUp(resourceCalculator, + Resources.min(resourceCalculator, partitionResource, + userLimitResource, maxUserLimit), + lQueue.getMinimumAllocation()); + + if (LOG.isDebugEnabled()) { + LOG.debug("User limit computation for " + userName + + " in queue " + lQueue.getQueueName() + + " userLimitPercent=" + lQueue.getUserLimit() + + " userLimitFactor=" + lQueue.getUserLimitFactor() + + " required: " + required + + " consumed: " + consumed + + " user-limit-resource: " + userLimitResource + + " queueCapacity: " + queueCapacity + + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed() + + " currentCapacity: " + currentCapacity + + " activeUsers: " + activeUsersCount + + " clusterCapacity: " + clusterResource + + " resourceByLabel: " + partitionResource + + " usageratio: " + lQueue.getUsageRatio(nodePartition) + + " Partition: " + nodePartition); + } + user.setUserResourceLimit(userLimitResource); + return userLimitResource; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index 5605f18..ef7698f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -21,17 +21,18 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.UsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.Resources; public class CapacityHeadroomProvider { - LeafQueue.User user; + UsersManager.User user; LeafQueue queue; FiCaSchedulerApp application; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; - public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue, + public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue, FiCaSchedulerApp application, LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 18b38f4..688e2a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.UserToPartitionRecord; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; @@ -80,7 +82,6 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -118,8 +119,8 @@ RecordFactoryProvider.getRecordFactory(null); private CapacitySchedulerContext scheduler; - - private final ActiveUsersManager activeUsersManager; + + private final UsersManager usersManager; // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); @@ -140,13 +141,23 @@ private Map> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + // Pre-computed list of user-limits. + Map> preComputedActiveUserLimit = + new ConcurrentHashMap<>(); + Map> preComputedAllUserLimit = + new ConcurrentHashMap<>(); + + // To detect whether there is a change in user count for every user-limit + // calculation. + private int recalculateULCount = 0; + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.usersManager = new UsersManager(metrics, this, labelManager); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -318,7 +329,7 @@ public int getMaxApplicationsPerUser() { @Override public ActiveUsersManager getActiveUsersManager() { - return activeUsersManager; + return usersManager.getActiveUsersManager(); } @Override @@ -466,7 +477,7 @@ private User getUserAndAddIfAbsent(String userName) { writeLock.lock(); User u = users.get(userName); if (null == u) { - u = new User(); + u = new User(usersManager.getActiveUsersManager(), userName); users.put(userName, u); } return u; @@ -879,7 +890,8 @@ private void addApplicationAttempt(FiCaSchedulerApp application, @Override public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager - activeUsersManager.deactivateApplication(user, application); + usersManager.getActiveUsersManager().deactivateApplication(user, + application); appFinished(); @@ -1266,7 +1278,7 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application.getUser(), clusterResource, user, + getComputedActiveUserLimit(application.getUser(), clusterResource, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } @@ -1339,9 +1351,8 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - computeUserLimit(application.getUser(), clusterResource, queueUser, - nodePartition, schedulingMode); + Resource userLimit = getComputedActiveUserLimit(application.getUser(), + clusterResource, queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1377,129 +1388,52 @@ public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } - @Lock(NoLock.class) - private Resource computeUserLimit(String userName, - Resource clusterResource, User user, - String nodePartition, SchedulingMode schedulingMode) { - Resource partitionResource = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // What is our current capacity? - // * It is equal to the max(required, queue-capacity) if - // we're running below capacity. The 'max' ensures that jobs in queues - // with miniscule capacity (< 1 slot) make progress - // * If we're running over capacity, then its - // (usedResources + required) (which extra resources we are allocating) - Resource queueCapacity = - Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedActiveUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { - // Assume we have required resource equals to minimumAllocation, this can - // make sure user limit can continuously increase till queueMaxResource - // reached. - Resource required = minimumAllocation; - - // Allow progress for queues with miniscule capacity - queueCapacity = - Resources.max( - resourceCalculator, partitionResource, - queueCapacity, - required); - - - /* We want to base the userLimit calculation on - * max(queueCapacity, usedResources+required). However, we want - * usedResources to be based on the combined ratios of all the users in the - * queue so we use consumedRatio to calculate such. - * The calculation is dependent on how the resourceCalculator calculates the - * ratio between two Resources. DRF Example: If usedResources is - * greater than queueCapacity and users have the following [mem,cpu] usages: - * User1: [10%,20%] - Dominant resource is 20% - * User2: [30%,10%] - Dominant resource is 30% - * Then total consumedRatio is then 20+30=50%. Yes, this value can be - * larger than 100% but for the purposes of making sure all users are - * getting their fair share, it works. - */ - Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, qUsageRatios.getUsageRatio(nodePartition), - minimumAllocation); - Resource currentCapacity = - Resources.lessThan(resourceCalculator, partitionResource, consumed, - queueCapacity) ? queueCapacity : Resources.add(consumed, required); - // Never allow a single user to take more than the - // queue's configured capacity * user-limit-factor. - // Also, the queue's configured capacity should be higher than - // queue-hard-limit * ulMin - - final int activeUsers = activeUsersManager.getNumActiveUsers(); - - // User limit resource is determined by: - // max{currentCapacity / #activeUsers, currentCapacity * - // user-limit-percentage%) - Resource userLimitResource = Resources.max( - resourceCalculator, partitionResource, - Resources.divideAndCeil( - resourceCalculator, currentCapacity, activeUsers), - Resources.divideAndCeil( - resourceCalculator, - Resources.multiplyAndRoundDown( - currentCapacity, userLimit), - 100) - ); - - // User limit is capped by maxUserLimit - // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY) - // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) - // - // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a - // partition, its guaranteed resource on that partition is 0. And - // user-limit-factor computation is based on queue's guaranteed capacity. So - // we will not cap user-limit as well as used resource when doing - // IGNORE_PARTITION_EXCLUSIVITY allocation. - Resource maxUserLimit = Resources.none(); - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxUserLimit = - Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); - } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - maxUserLimit = partitionResource; - } - - // Cap final user limit with maxUserLimit - userLimitResource = - Resources.roundUp( - resourceCalculator, - Resources.min( - resourceCalculator, partitionResource, - userLimitResource, - maxUserLimit - ), - minimumAllocation); + return usersManager.getComputedActiveUserLimit(userName, clusterResource, + user, nodePartition, schedulingMode, resourceCalculator); + } - if (LOG.isDebugEnabled()) { - LOG.debug("User limit computation for " + userName + - " in queue " + getQueueName() + - " userLimitPercent=" + userLimit + - " userLimitFactor=" + userLimitFactor + - " required: " + required + - " consumed: " + consumed + - " user-limit-resource: " + userLimitResource + - " queueCapacity: " + queueCapacity + - " qconsumed: " + queueUsage.getUsed() + - " consumedRatio: " + totalUserConsumedRatio + - " currentCapacity: " + currentCapacity + - " activeUsers: " + activeUsers + - " clusterCapacity: " + clusterResource + - " resourceByLabel: " + partitionResource + - " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + - " Partition: " + nodePartition - ); - } - user.setUserResourceLimit(userLimitResource); - return userLimitResource; + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { + + return usersManager.getComputedUserLimit(userName, clusterResource, user, + nodePartition, schedulingMode, resourceCalculator); } - + @Private protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, @@ -1750,6 +1684,10 @@ void allocateResource(Resource clusterResource, user.assignContainer(resource, nodePartition); + // New container is allocated. Invalidate user-limit for given user. + getUser(application.getUser()) + .setCachedULCount(getRecalculateULCount() - 1); + // Update usage ratios updateQueueUsageRatio(nodePartition, user.updateUsageRatio(resourceCalculator, resourceByLabel, @@ -1799,6 +1737,9 @@ void releaseResource(Resource clusterResource, User user = getUserAndAddIfAbsent(userName); user.releaseContainer(resource, nodePartition); + // Container is released. Invalidate user-limit. + getUser(application.getUser()).setCachedULCount(getRecalculateULCount() - 1); + // Update usage ratios updateQueueUsageRatio(nodePartition, user.updateUsageRatio(resourceCalculator, resourceByLabel, @@ -1861,6 +1802,10 @@ public void updateClusterResource(Resource clusterResource, // activate the pending applications if possible activateApplications(); + // In case of any resource change, invalidate recalculateULCount to clear + // the computed user-limit. + setRecalculateULCount(getRecalculateULCount() + 1); + // Update application properties for (FiCaSchedulerApp application : orderingPolicy .getSchedulableEntities()) { @@ -1876,16 +1821,14 @@ public void updateClusterResource(Resource clusterResource, @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel, - resourceToInc); + getUser(application.getUser()).assignContainer(resourceToInc, nodeLabel); super.incUsedResource(nodeLabel, resourceToInc, application); } @Override public void decUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel, - resourceToDec); + getUser(application.getUser()).releaseContainer(resourceToDec, nodeLabel); super.decUsedResource(nodeLabel, resourceToDec, application); } @@ -1908,7 +1851,7 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec, /* * Usage Ratio */ - static private class UsageRatios { + static public class UsageRatios { private Map usageRatios; private ReadLock readLock; private WriteLock writeLock; @@ -1934,7 +1877,7 @@ private void incUsageRatio(String label, float delta) { } } - float getUsageRatio(String label) { + public float getUsageRatio(String label) { try { readLock.lock(); Float f = usageRatios.get(label); @@ -1947,7 +1890,7 @@ float getUsageRatio(String label) { } } - private void setUsageRatio(String label, float ratio) { + public void setUsageRatio(String label, float ratio) { try { writeLock.lock(); usageRatios.put(label, new Float(ratio)); @@ -1962,134 +1905,6 @@ public float getUsageRatio(String label) { return qUsageRatios.getUsageRatio(label); } - @VisibleForTesting - public static class User { - ResourceUsage userResourceUsage = new ResourceUsage(); - volatile Resource userResourceLimit = Resource.newInstance(0, 0); - volatile int pendingApplications = 0; - volatile int activeApplications = 0; - private UsageRatios userUsageRatios = new UsageRatios(); - private WriteLock writeLock; - - User() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - // Nobody uses read-lock now, will add it when necessary - writeLock = lock.writeLock(); - } - - public ResourceUsage getResourceUsage() { - return userResourceUsage; - } - - public float resetAndUpdateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - userUsageRatios.setUsageRatio(nodePartition, 0); - return updateUsageRatio(resourceCalculator, resource, nodePartition); - } finally { - writeLock.unlock(); - } - } - - public float updateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - float delta; - float newRatio = Resources.ratio(resourceCalculator, - getUsed(nodePartition), resource); - delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); - userUsageRatios.setUsageRatio(nodePartition, newRatio); - return delta; - } finally { - writeLock.unlock(); - } - } - - public Resource getUsed() { - return userResourceUsage.getUsed(); - } - - public Resource getAllUsed() { - return userResourceUsage.getAllUsed(); - } - - public Resource getUsed(String label) { - return userResourceUsage.getUsed(label); - } - - public int getPendingApplications() { - return pendingApplications; - } - - public int getActiveApplications() { - return activeApplications; - } - - public Resource getConsumedAMResources() { - return userResourceUsage.getAMUsed(); - } - - public Resource getConsumedAMResources(String label) { - return userResourceUsage.getAMUsed(label); - } - - public int getTotalApplications() { - return getPendingApplications() + getActiveApplications(); - } - - public void submitApplication() { - try { - writeLock.lock(); - ++pendingApplications; - } finally { - writeLock.unlock(); - } - } - - public void activateApplication() { - try { - writeLock.lock(); - --pendingApplications; - ++activeApplications; - } finally { - writeLock.unlock(); - } - } - - public void finishApplication(boolean wasActive) { - try { - writeLock.lock(); - if (wasActive) { - --activeApplications; - } else{ - --pendingApplications; - } - } finally { - writeLock.unlock(); - } - } - - public void assignContainer(Resource resource, String nodePartition) { - userResourceUsage.incUsed(nodePartition, resource); - } - - public void releaseContainer(Resource resource, String nodePartition) { - userResourceUsage.decUsed(nodePartition, resource); - } - - public Resource getUserResourceLimit() { - return userResourceLimit; - } - - public void setUserResourceLimit(Resource userResourceLimit) { - this.userResourceLimit = userResourceLimit; - } - } - @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { @@ -2155,7 +1970,7 @@ public Resource getTotalPendingResourcesConsideringUserLimit( if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app.getUser(), resources, user, partition, + getComputedActiveUserLimit(app.getUser(), resources, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. @@ -2177,16 +1992,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit( } - public synchronized Resource getUserLimitPerUser(String userName, - Resource resources, String partition) { - - // Check user resource limit - User user = getUser(userName); - - return computeUserLimit(userName, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - } - @Override public void collectSchedulerApplications( Collection apps) { @@ -2445,4 +2250,12 @@ public void stopQueue() { writeLock.unlock(); } } + + public int getRecalculateULCount() { + return recalculateULCount; + } + + public void setRecalculateULCount(int recalculateULCount) { + this.recalculateULCount = recalculateULCount; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 83ff52b..c106178 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.UsersManager.User; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -359,9 +361,11 @@ private void mockApplications(String appsConfig) { queue.getQueueCapacities().getAbsoluteCapacity()); HashSet users = userMap.get(queue.getQueueName()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String user : users) { - when(queue.getUserLimitPerUser(eq(user), any(Resource.class), - anyString())).thenReturn(userLimit); + for (String userName : users) { + when(queue.getComputedActiveUserLimit(eq(userName), any(Resource.class), + any(User.class), anyString(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)) + .thenReturn(userLimit); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 0a864fd..2cd3816 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.UsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; @@ -184,7 +185,7 @@ private void checkUserUsedResource(MockRM rm, String queueName, String userName, String partition, int memory) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName); - LeafQueue.User user = queue.getUser(userName); + UsersManager.User user = queue.getUser(userName); Assert.assertEquals(memory, user.getResourceUsage().getUsed(partition).getMemorySize()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index b2695bc..544862b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -76,11 +76,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.UsersManager.User; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -- 2.7.4 (Apple Git-66)