From 66c6087859a4b39daa4821b28c69a724eb5b1cec Mon Sep 17 00:00:00 2001 From: Sunil G Date: Tue, 17 Jan 2017 17:22:51 +0530 Subject: [PATCH] YARN-5889 --- .../resourcemanager/UserToPartitionRecord.java | 93 +++ .../capacity/FifoIntraQueuePreemptionPlugin.java | 7 +- .../scheduler/ActiveUsersManager.java | 58 +- .../capacity/CapacityHeadroomProvider.java | 4 +- .../scheduler/capacity/LeafQueue.java | 625 +++++------------- .../scheduler/capacity/UsersManager.java | 727 +++++++++++++++++++++ .../webapp/dao/CapacitySchedulerLeafQueueInfo.java | 2 +- ...ionalCapacityPreemptionPolicyMockFramework.java | 9 +- .../TestCapacitySchedulerNodeLabelUpdate.java | 4 +- .../scheduler/capacity/TestLeafQueue.java | 15 +- 10 files changed, 1051 insertions(+), 493 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java new file mode 100644 index 0000000..1b94e2e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +/** + * This class make a mapping of UserName to Partition. This helps to get all + * users of a queue under each partition. + */ +public class UserToPartitionRecord + implements + Comparable { + + private String userName; + private String partition; + + public static UserToPartitionRecord newInstance(String userName, + String partition) { + UserToPartitionRecord userRecord = new UserToPartitionRecord(); + userRecord.setPartition(partition); + userRecord.setUserName(userName); + return userRecord; + } + + public String getPartition() { + return partition; + } + + public String getUserName() { + return userName; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + @Override + public int compareTo(UserToPartitionRecord o) { + int result = this.getUserName().compareTo(o.getUserName()); + if (result == 0) { + return this.getPartition().compareTo(o.getPartition()); + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UserToPartitionRecord other = (UserToPartitionRecord) obj; + if (!getUserName().equals(other.getUserName()) + || !getPartition().equals(other.getPartition())) { + return false; + } + return true; + } + + @Override + public int hashCode() { + final int prime = 493217; + int result = 8501; + result = prime * result + this.getUserName().hashCode(); + result = prime * result + this.getPartition().hashCode(); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 757f567..51f05b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -264,8 +265,10 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Verify whether we already calculated headroom for this user. if (userLimitResource == null) { - userLimitResource = Resources.clone(tq.leafQueue - .getUserLimitPerUser(userName, partitionBasedResource, partition)); + userLimitResource = Resources + .clone(tq.leafQueue.getResourceLimitForAllUsers(userName, + clusterResource, tq.leafQueue.getUser(userName), partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); Resource amUsed = perUserAMUsed.get(userName); if (null == amUsed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e6858..f4ad694 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +48,9 @@ private int activeUsers = 0; private Map> usersApplications = new HashMap>(); - + private AtomicBoolean userAddedOrRemoved = new AtomicBoolean(false); + private ResourceUsage totalResUsedByActiveUsers = new ResourceUsage(); + public ActiveUsersManager(QueueMetrics metrics) { this.metrics = metrics; } @@ -65,6 +70,7 @@ synchronized public void activateApplication( usersApplications.put(user, userApps); ++activeUsers; metrics.incrActiveUsers(); + setUserAddedOrRemoved(true); LOG.debug("User " + user + " added to activeUsers, currently: " + activeUsers); } @@ -91,12 +97,13 @@ synchronized public void deactivateApplication( usersApplications.remove(user); --activeUsers; metrics.decrActiveUsers(); + setUserAddedOrRemoved(true); LOG.debug("User " + user + " removed from activeUsers, currently: " + activeUsers); } } } - + /** * Get number of active users i.e. users with applications which have pending * resource requests. @@ -106,4 +113,51 @@ synchronized public void deactivateApplication( synchronized public int getNumActiveUsers() { return activeUsers; } + + /** + * Get active users i.e. users with applications which have pending + * resource requests. + * @return number of active users + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public Collection getActiveUsers() { + return Collections.unmodifiableCollection(usersApplications.keySet()); + } + + /** + * + * @param user + * User Name + * @return whether given user is active or not. + */ + @Lock({Queue.class}) + synchronized public boolean isAnActiveUser(String user) { + return usersApplications.containsKey(user); + } + + /** + * Check whether any user has became active or non-active. + * @param newValue Set to false once its read. + * @return Return true/false. + */ + public boolean isUserAddedOrRemoved(boolean newValue) { + return userAddedOrRemoved.getAndSet(newValue); + } + + /** + * Update the active users tracking boolean only if it matches the old value. + * @param reset + * New value + */ + public void setUserAddedOrRemoved(boolean reset) { + this.userAddedOrRemoved.set(reset); + } + + /** + * Get total resource used by all users in a queue. + * @return Resource used + */ + public ResourceUsage getTotalResUsedByActiveUsers() { + return totalResUsedByActiveUsers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index 5605f18..140a2ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -26,12 +26,12 @@ public class CapacityHeadroomProvider { - LeafQueue.User user; + UsersManager.User user; LeafQueue queue; FiCaSchedulerApp application; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; - public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue, + public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue, FiCaSchedulerApp application, LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 0ea56e7..7018869 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -56,17 +53,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; @@ -80,7 +72,6 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -92,8 +83,6 @@ private static final Log LOG = LogFactory.getLog(LeafQueue.class); private float absoluteUsedCapacity = 0.0f; - private volatile int userLimit; - private volatile float userLimitFactor; protected int maxApplications; protected volatile int maxApplicationsPerUser; @@ -112,14 +101,12 @@ private volatile float minimumAllocationFactor; - private Map users = new ConcurrentHashMap<>(); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private CapacitySchedulerContext scheduler; - private final ActiveUsersManager activeUsersManager; + private final UsersManager usersManager; // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); @@ -131,10 +118,6 @@ private volatile OrderingPolicy orderingPolicy = null; - // Summation of consumed ratios for all users in queue - private float totalUserConsumedRatio = 0; - private UsageRatios qUsageRatios; - // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -149,13 +132,11 @@ public LeafQueue(CapacitySchedulerContext cs, super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.usersManager = new UsersManager(metrics, this, labelManager); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); - qUsageRatios = new UsageRatios(); - if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); @@ -187,8 +168,8 @@ protected void setupQueueConfigs(Resource clusterResource) setOrderingPolicy( conf.getOrderingPolicy(getQueuePath())); - userLimit = conf.getUserLimit(getQueuePath()); - userLimitFactor = conf.getUserLimitFactor(getQueuePath()); + usersManager.setUserLimit(conf.getUserLimit(getQueuePath())); + usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath())); maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { @@ -202,7 +183,8 @@ protected void setupQueueConfigs(Resource clusterResource) } } maxApplicationsPerUser = Math.min(maxApplications, - (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); + (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) + * usersManager.getUserLimitFactor())); maxAMResourcePerQueuePercent = conf.getMaximumApplicationMasterResourcePerQueuePercent( @@ -260,8 +242,9 @@ protected void setupQueueConfigs(Resource clusterResource) + queueCapacities.getAbsoluteMaximumCapacity() + " [= 1.0 maximumCapacity undefined, " + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" - + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" - + "\n" + "userLimitFactor = " + userLimitFactor + + "\n" + "userLimit = " + usersManager.getUserLimit() + + " [= configuredUserLimit ]" + + "\n" + "userLimitFactor = " + usersManager.getUserLimitFactor() + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + maxApplications + " [= configuredMaximumSystemApplicationsPerQueue or" @@ -322,9 +305,17 @@ public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } + /** + * + * @return UsersManager instance. + */ + public UsersManager getUsersManager() { + return usersManager; + } + @Override public ActiveUsersManager getActiveUsersManager() { - return activeUsersManager; + return usersManager.getActiveUsersManager(); } @Override @@ -338,7 +329,8 @@ public ActiveUsersManager getActiveUsersManager() { */ @VisibleForTesting void setUserLimit(int userLimit) { - this.userLimit = userLimit; + usersManager.setUserLimit(userLimit); + usersManager.userLimitNeedsRecompute(); } /** @@ -347,7 +339,8 @@ void setUserLimit(int userLimit) { */ @VisibleForTesting void setUserLimitFactor(float userLimitFactor) { - this.userLimitFactor = userLimitFactor; + usersManager.setUserLimitFactor(userLimitFactor); + usersManager.userLimitNeedsRecompute(); } @Override @@ -408,12 +401,12 @@ public int getNumActiveApplications(String user) { @Private public int getUserLimit() { - return userLimit; + return usersManager.getUserLimit(); } @Private public float getUserLimitFactor() { - return userLimitFactor; + return usersManager.getUserLimitFactor(); } @Override @@ -463,44 +456,7 @@ public String toString() { @VisibleForTesting public User getUser(String userName) { - return users.get(userName); - } - - // Get and add user if absent - private User getUserAndAddIfAbsent(String userName) { - try { - writeLock.lock(); - User u = users.get(userName); - if (null == u) { - u = new User(); - users.put(userName, u); - } - return u; - } finally { - writeLock.unlock(); - } - } - - /** - * @return an ArrayList of UserInfo objects who are active in this queue - */ - public ArrayList getUsers() { - try { - readLock.lock(); - ArrayList usersToReturn = new ArrayList(); - for (Map.Entry entry : users.entrySet()) { - User user = entry.getValue(); - usersToReturn.add( - new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), - user.getActiveApplications(), user.getPendingApplications(), - Resources.clone(user.getConsumedAMResources()), - Resources.clone(user.getUserResourceLimit()), - user.getResourceUsage())); - } - return usersToReturn; - } finally { - readLock.unlock(); - } + return usersManager.getUser(userName); } @Private @@ -561,7 +517,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); // Add the attempt to our data-structures addApplicationAttempt(application, user); @@ -618,7 +574,7 @@ public void validateSubmitApplication(ApplicationId applicationId, } // Check submission limits for the user on this queue - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { String msg = "Queue " + getQueuePath() + " already has " + user .getTotalApplications() + " applications from user " + userName @@ -668,7 +624,7 @@ public Resource getUserAMResourceLimitPerPartition( * the absolute queue capacity (per partition) instead of the max and is * modified by the userlimit and the userlimit factor as is the userlimit */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, + float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( @@ -680,7 +636,8 @@ public Resource getUserAMResourceLimitPerPartition( Resource userAMLimit = Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionResource, queueCapacities.getMaxAMResourcePercentage(nodePartition) - * effectiveUserLimit * userLimitFactor, minimumAllocation); + * effectiveUserLimit * usersManager.getUserLimitFactor(), + minimumAllocation); return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? userAMLimit : @@ -895,7 +852,8 @@ private void addApplicationAttempt(FiCaSchedulerApp application, @Override public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager - activeUsersManager.deactivateApplication(user, application); + usersManager.getActiveUsersManager().deactivateApplication(user, + application); appFinished(); @@ -917,7 +875,7 @@ private void removeApplicationAttempt( // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); String partitionName = application.getAppAMNodePartitionName(); boolean wasActive = orderingPolicy.removeSchedulableEntity(application); @@ -935,7 +893,7 @@ private void removeApplicationAttempt( user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { - users.remove(application.getUser()); + usersManager.removeUser(application.getUser()); } // Check if we can activate more applications @@ -1282,8 +1240,8 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application.getUser(), clusterResource, user, - partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + getResourceLimitForActiveUsers(application.getUser(), clusterResource, + user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } @@ -1355,9 +1313,8 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - computeUserLimit(application.getUser(), clusterResource, queueUser, - nodePartition, schedulingMode); + Resource userLimit = getResourceLimitForActiveUsers(application.getUser(), + clusterResource, queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1393,129 +1350,52 @@ public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } - @Lock(NoLock.class) - private Resource computeUserLimit(String userName, - Resource clusterResource, User user, - String nodePartition, SchedulingMode schedulingMode) { - Resource partitionResource = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // What is our current capacity? - // * It is equal to the max(required, queue-capacity) if - // we're running below capacity. The 'max' ensures that jobs in queues - // with miniscule capacity (< 1 slot) make progress - // * If we're running over capacity, then its - // (usedResources + required) (which extra resources we are allocating) - Resource queueCapacity = - Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); - - // Assume we have required resource equals to minimumAllocation, this can - // make sure user limit can continuously increase till queueMaxResource - // reached. - Resource required = minimumAllocation; - - // Allow progress for queues with miniscule capacity - queueCapacity = - Resources.max( - resourceCalculator, partitionResource, - queueCapacity, - required); - - - /* We want to base the userLimit calculation on - * max(queueCapacity, usedResources+required). However, we want - * usedResources to be based on the combined ratios of all the users in the - * queue so we use consumedRatio to calculate such. - * The calculation is dependent on how the resourceCalculator calculates the - * ratio between two Resources. DRF Example: If usedResources is - * greater than queueCapacity and users have the following [mem,cpu] usages: - * User1: [10%,20%] - Dominant resource is 20% - * User2: [30%,10%] - Dominant resource is 30% - * Then total consumedRatio is then 20+30=50%. Yes, this value can be - * larger than 100% but for the purposes of making sure all users are - * getting their fair share, it works. - */ - Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, qUsageRatios.getUsageRatio(nodePartition), - minimumAllocation); - Resource currentCapacity = - Resources.lessThan(resourceCalculator, partitionResource, consumed, - queueCapacity) ? queueCapacity : Resources.add(consumed, required); - // Never allow a single user to take more than the - // queue's configured capacity * user-limit-factor. - // Also, the queue's configured capacity should be higher than - // queue-hard-limit * ulMin - - final int activeUsers = activeUsersManager.getNumActiveUsers(); - - // User limit resource is determined by: - // max{currentCapacity / #activeUsers, currentCapacity * - // user-limit-percentage%) - Resource userLimitResource = Resources.max( - resourceCalculator, partitionResource, - Resources.divideAndCeil( - resourceCalculator, currentCapacity, activeUsers), - Resources.divideAndCeil( - resourceCalculator, - Resources.multiplyAndRoundDown( - currentCapacity, userLimit), - 100) - ); - - // User limit is capped by maxUserLimit - // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY) - // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) - // - // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a - // partition, its guaranteed resource on that partition is 0. And - // user-limit-factor computation is based on queue's guaranteed capacity. So - // we will not cap user-limit as well as used resource when doing - // IGNORE_PARTITION_EXCLUSIVITY allocation. - Resource maxUserLimit = Resources.none(); - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxUserLimit = - Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); - } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - maxUserLimit = partitionResource; - } - - // Cap final user limit with maxUserLimit - userLimitResource = - Resources.roundUp( - resourceCalculator, - Resources.min( - resourceCalculator, partitionResource, - userLimitResource, - maxUserLimit - ), - minimumAllocation); + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getResourceLimitForActiveUsers(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { + return usersManager.getComputedResourceLimitForActiveUsers(userName, + clusterResource, user, nodePartition, schedulingMode, + resourceCalculator); + } - if (LOG.isDebugEnabled()) { - LOG.debug("User limit computation for " + userName + - " in queue " + getQueueName() + - " userLimitPercent=" + userLimit + - " userLimitFactor=" + userLimitFactor + - " required: " + required + - " consumed: " + consumed + - " user-limit-resource: " + userLimitResource + - " queueCapacity: " + queueCapacity + - " qconsumed: " + queueUsage.getUsed() + - " consumedRatio: " + totalUserConsumedRatio + - " currentCapacity: " + currentCapacity + - " activeUsers: " + activeUsers + - " clusterCapacity: " + clusterResource + - " resourceByLabel: " + partitionResource + - " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + - " Partition: " + nodePartition - ); - } - user.setUserResourceLimit(userLimitResource); - return userLimitResource; + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getResourceLimitForAllUsers(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { + return usersManager.getComputedResourceLimitForAllUsers(userName, + clusterResource, user, nodePartition, schedulingMode, + resourceCalculator); } - + @Private protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, @@ -1620,52 +1500,6 @@ private void updateSchedulerHealthForCompletedContainer( } } - private float calculateUserUsageRatio(Resource clusterResource, - String nodePartition) { - try { - writeLock.lock(); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - float consumed = 0; - User user; - for (Map.Entry entry : users.entrySet()) { - user = entry.getValue(); - consumed += user.resetAndUpdateUsageRatio(resourceCalculator, - resourceByLabel, nodePartition); - } - return consumed; - } finally { - writeLock.unlock(); - } - } - - private void recalculateQueueUsageRatio(Resource clusterResource, - String nodePartition) { - try { - writeLock.lock(); - ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); - - if (nodePartition == null) { - for (String partition : Sets.union( - queueCapacities.getNodePartitionsSet(), - queueResourceUsage.getNodePartitionsSet())) { - qUsageRatios.setUsageRatio(partition, - calculateUserUsageRatio(clusterResource, partition)); - } - } else{ - qUsageRatios.setUsageRatio(nodePartition, - calculateUserUsageRatio(clusterResource, nodePartition)); - } - } finally { - writeLock.unlock(); - } - } - - private void updateQueueUsageRatio(String nodePartition, - float delta) { - qUsageRatios.incUsageRatio(nodePartition, delta); - } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1762,14 +1596,16 @@ void allocateResource(Resource clusterResource, // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); + // New container is allocated. Invalidate user-limit. user.assignContainer(resource, nodePartition); + usersManager.userLimitNeedsRecompute(); + // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + usersManager.updateQueueUsageRatio(nodePartition, user.updateUsageRatio( + resourceCalculator, resourceByLabel, nodePartition)); // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine @@ -1812,13 +1648,15 @@ void releaseResource(Resource clusterResource, // Update user metrics String userName = application.getUser(); - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); user.releaseContainer(resource, nodePartition); + // Container is released. Invalidate user-limit. + usersManager.userLimitNeedsRecompute(); + // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + usersManager.updateQueueUsageRatio(nodePartition, user.updateUsageRatio( + resourceCalculator, resourceByLabel, nodePartition)); metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); @@ -1853,6 +1691,31 @@ private void updateCurrentResourceLimits( currentResourceLimits.getLimit())); } + /** + * Recalculate QueueUsage Ratio + * @param clusterResource Total Cluster Resource + * @param nodePartition Partition + */ + public void recalculateQueueUsageRatio(Resource clusterResource, + String nodePartition) { + try { + writeLock.lock(); + ResourceUsage queueResourceUsage = getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union( + getQueueCapacities().getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + usersManager.setUsageRatio(partition, clusterResource); + } + } else { + usersManager.setUsageRatio(nodePartition, clusterResource); + } + } finally { + writeLock.unlock(); + } + } + @Override public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { @@ -1877,6 +1740,10 @@ public void updateClusterResource(Resource clusterResource, // activate the pending applications if possible activateApplications(); + // In case of any resource change, invalidate recalculateULCount to clear + // the computed user-limit. + usersManager.userLimitNeedsRecompute(); + // Update application properties for (FiCaSchedulerApp application : orderingPolicy .getSchedulableEntities()) { @@ -1892,16 +1759,20 @@ public void updateClusterResource(Resource clusterResource, @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel, - resourceToInc); + User user = getUser(application.getUser()); + user.getResourceUsage().incUsed(nodeLabel, resourceToInc); + user.assignContainer(resourceToInc, nodeLabel); + super.incUsedResource(nodeLabel, resourceToInc, application); } @Override public void decUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel, - resourceToDec); + User user = getUser(application.getUser()); + user.getResourceUsage().decUsed(nodeLabel, resourceToDec); + user.releaseContainer(resourceToDec, nodeLabel); + super.decUsedResource(nodeLabel, resourceToDec, application); } @@ -1921,191 +1792,6 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec, queueUsage.decAMUsed(nodeLabel, resourceToDec); } - /* - * Usage Ratio - */ - static private class UsageRatios { - private Map usageRatios; - private ReadLock readLock; - private WriteLock writeLock; - - public UsageRatios() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - usageRatios = new HashMap(); - } - - private void incUsageRatio(String label, float delta) { - try { - writeLock.lock(); - Float fl = usageRatios.get(label); - if (null == fl) { - fl = new Float(0.0); - } - fl += delta; - usageRatios.put(label, new Float(fl)); - } finally { - writeLock.unlock(); - } - } - - float getUsageRatio(String label) { - try { - readLock.lock(); - Float f = usageRatios.get(label); - if (null == f) { - return 0.0f; - } - return f; - } finally { - readLock.unlock(); - } - } - - private void setUsageRatio(String label, float ratio) { - try { - writeLock.lock(); - usageRatios.put(label, new Float(ratio)); - } finally { - writeLock.unlock(); - } - } - } - - @VisibleForTesting - public float getUsageRatio(String label) { - return qUsageRatios.getUsageRatio(label); - } - - @VisibleForTesting - public static class User { - ResourceUsage userResourceUsage = new ResourceUsage(); - volatile Resource userResourceLimit = Resource.newInstance(0, 0); - volatile int pendingApplications = 0; - volatile int activeApplications = 0; - private UsageRatios userUsageRatios = new UsageRatios(); - private WriteLock writeLock; - - User() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - // Nobody uses read-lock now, will add it when necessary - writeLock = lock.writeLock(); - } - - public ResourceUsage getResourceUsage() { - return userResourceUsage; - } - - public float resetAndUpdateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - userUsageRatios.setUsageRatio(nodePartition, 0); - return updateUsageRatio(resourceCalculator, resource, nodePartition); - } finally { - writeLock.unlock(); - } - } - - public float updateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - float delta; - float newRatio = Resources.ratio(resourceCalculator, - getUsed(nodePartition), resource); - delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); - userUsageRatios.setUsageRatio(nodePartition, newRatio); - return delta; - } finally { - writeLock.unlock(); - } - } - - public Resource getUsed() { - return userResourceUsage.getUsed(); - } - - public Resource getAllUsed() { - return userResourceUsage.getAllUsed(); - } - - public Resource getUsed(String label) { - return userResourceUsage.getUsed(label); - } - - public int getPendingApplications() { - return pendingApplications; - } - - public int getActiveApplications() { - return activeApplications; - } - - public Resource getConsumedAMResources() { - return userResourceUsage.getAMUsed(); - } - - public Resource getConsumedAMResources(String label) { - return userResourceUsage.getAMUsed(label); - } - - public int getTotalApplications() { - return getPendingApplications() + getActiveApplications(); - } - - public void submitApplication() { - try { - writeLock.lock(); - ++pendingApplications; - } finally { - writeLock.unlock(); - } - } - - public void activateApplication() { - try { - writeLock.lock(); - --pendingApplications; - ++activeApplications; - } finally { - writeLock.unlock(); - } - } - - public void finishApplication(boolean wasActive) { - try { - writeLock.lock(); - if (wasActive) { - --activeApplications; - } else{ - --pendingApplications; - } - } finally { - writeLock.unlock(); - } - } - - public void assignContainer(Resource resource, String nodePartition) { - userResourceUsage.incUsed(nodePartition, resource); - } - - public void releaseContainer(Resource resource, String nodePartition) { - userResourceUsage.decUsed(nodePartition, resource); - } - - public Resource getUserResourceLimit() { - return userResourceLimit; - } - - public void setUserResourceLimit(Resource userResourceLimit) { - this.userResourceLimit = userResourceLimit; - } - } - @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { @@ -2159,25 +1845,25 @@ public void recoverContainer(Resource clusterResource, * Get total pending resource considering user limit for the leaf queue. This * will be used for calculating pending resources in the preemption monitor. * - * Consider the headroom for each user in the queue. - * Total pending for the queue = - * sum(for each user(min((user's headroom), sum(user's pending requests)))) - * NOTE: - - * @param clusterResources clusterResource - * @param partition node partition - * @param deductReservedFromPending When a container is reserved in CS, - * pending resource will not be deducted. - * This could lead to double accounting when - * doing preemption: - * In normal cases, we should deduct reserved - * resource from pending to avoid - * excessive preemption. + * Consider the headroom for each user in the queue. Total pending for the + * queue = sum(for each user(min((user's headroom), sum(user's pending + * requests)))) NOTE: + * + * @param clusterResources + * clusterResource + * @param partition + * node partition + * @param deductReservedFromPending + * When a container is reserved in CS, pending resource will not be + * deducted. This could lead to double accounting when doing + * preemption: In normal cases, we should deduct reserved resource + * from pending to avoid excessive preemption. * @return Total pending resource considering user limit */ public Resource getTotalPendingResourcesConsideringUserLimit( - Resource clusterResources, String partition, boolean deductReservedFromPending) { + Resource clusterResources, String partition, + boolean deductReservedFromPending) { try { readLock.lock(); Map userNameToHeadroom = @@ -2188,7 +1874,8 @@ public Resource getTotalPendingResourcesConsideringUserLimit( if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app.getUser(), clusterResources, user, partition, + getResourceLimitForActiveUsers(app.getUser(), clusterResources, + user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. @@ -2219,16 +1906,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit( } - public synchronized Resource getUserLimitPerUser(String userName, - Resource resources, String partition) { - - // Check user resource limit - User user = getUser(userName); - - return computeUserLimit(userName, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - } - @Override public void collectSchedulerApplications( Collection apps) { @@ -2422,8 +2099,8 @@ public void decreaseContainer(Resource clusterResource, } } - public void updateApplicationPriority(SchedulerApplication app, - Priority newAppPriority) { + public void updateApplicationPriority( + SchedulerApplication app, Priority newAppPriority) { try { writeLock.lock(); FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java new file mode 100644 index 0000000..34f31da --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -0,0 +1,727 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.UserToPartitionRecord; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link UsersManager} tracks users in the system and its respective data + * structures. + */ +@Private +public class UsersManager { + + private static final Log LOG = LogFactory.getLog(UsersManager.class); + + /** + * UsageRatios will store the total used resources ratio across all users of + * the queue. + */ + static private class UsageRatios { + private Map usageRatios; + private ReadLock readLock; + private WriteLock writeLock; + + public UsageRatios() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + usageRatios = new HashMap(); + } + + private void incUsageRatio(String label, float delta) { + try { + writeLock.lock(); + Float fl = usageRatios.get(label); + if (null == fl) { + fl = new Float(0.0); + } + fl += delta; + usageRatios.put(label, new Float(fl)); + } finally { + writeLock.unlock(); + } + } + + private float getUsageRatio(String label) { + try { + readLock.lock(); + Float f = usageRatios.get(label); + if (null == f) { + return 0.0f; + } + return f; + } finally { + readLock.unlock(); + } + } + + private void setUsageRatio(String label, float ratio) { + try { + writeLock.lock(); + usageRatios.put(label, new Float(ratio)); + } finally { + writeLock.unlock(); + } + } + } /* End of UserRatios class */ + + @VisibleForTesting + public static class User { + ResourceUsage userResourceUsage = new ResourceUsage(); + ActiveUsersManager activeUsersManager = null; + String userName = null; + volatile Resource userResourceLimit = Resource.newInstance(0, 0); + volatile AtomicInteger pendingApplications = new AtomicInteger(0); + volatile AtomicInteger activeApplications = new AtomicInteger(0); + volatile long localVersionOfUsersState = 0; + + private UsageRatios userUsageRatios = new UsageRatios(); + private WriteLock writeLock; + + public User(ActiveUsersManager activeUsersManager, String name) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Nobody uses read-lock now, will add it when necessary + writeLock = lock.writeLock(); + this.activeUsersManager = activeUsersManager; + this.userName = name; + } + + public ResourceUsage getResourceUsage() { + return userResourceUsage; + } + + public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } finally { + writeLock.unlock(); + } + } + + public float updateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + float delta; + float newRatio = Resources.ratio(resourceCalculator, + getUsed(nodePartition), resource); + delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); + userUsageRatios.setUsageRatio(nodePartition, newRatio); + return delta; + } finally { + writeLock.unlock(); + } + } + + public Resource getUsed() { + return userResourceUsage.getUsed(); + } + + public Resource getAllUsed() { + return userResourceUsage.getAllUsed(); + } + + public Resource getUsed(String label) { + return userResourceUsage.getUsed(label); + } + + public int getPendingApplications() { + return pendingApplications.get(); + } + + public int getActiveApplications() { + return activeApplications.get(); + } + + public Resource getConsumedAMResources() { + return userResourceUsage.getAMUsed(); + } + + public Resource getConsumedAMResources(String label) { + return userResourceUsage.getAMUsed(label); + } + + public int getTotalApplications() { + return getPendingApplications() + getActiveApplications(); + } + + public void submitApplication() { + pendingApplications.incrementAndGet(); + } + + public void activateApplication() { + pendingApplications.decrementAndGet(); + activeApplications.incrementAndGet(); + } + + public void finishApplication(boolean wasActive) { + if (wasActive) { + activeApplications.decrementAndGet(); + } else { + pendingApplications.decrementAndGet(); + } + } + + public void assignContainer(Resource resource, String nodePartition) { + userResourceUsage.incUsed(nodePartition, resource); + + // Add to active user's total resource as well. + if (activeUsersManager.isAnActiveUser(userName)) { + activeUsersManager.getTotalResUsedByActiveUsers().incUsed(nodePartition, + resource); + } + } + + public void releaseContainer(Resource resource, String nodePartition) { + userResourceUsage.decUsed(nodePartition, resource); + + // Decrement from active user's total resource as well. + if (activeUsersManager.isAnActiveUser(userName)) { + activeUsersManager.getTotalResUsedByActiveUsers().decUsed(nodePartition, + resource); + } + } + + public Resource getUserResourceLimit() { + return userResourceLimit; + } + + public void setUserResourceLimit(Resource userResourceLimit) { + this.userResourceLimit = userResourceLimit; + } + + public void setLocalVersionOfUsersState(long updatedCount) { + try { + writeLock.lock(); + this.localVersionOfUsersState = updatedCount; + } finally { + writeLock.unlock(); + } + } + + public long getLocalVersionOfUsersState() { + return localVersionOfUsersState; + } + } /* End of User class */ + + /* + * Member declaration for UsersManager class. + */ + private final LeafQueue lQueue; + private final RMNodeLabelsManager labelManager; + + private ActiveUsersManager activeUsersManager = null; + private Map users = new ConcurrentHashMap<>(); + + // Summation of consumed ratios for all users in queue + private UsageRatios qUsageRatios; + + // To detect whether there is a change in user count for every user-limit + // calculation. + private AtomicLong latestVersionOfUsersState = new AtomicLong(0); + + private volatile int userLimit; + private volatile float userLimitFactor; + + private WriteLock writeLock; + private ReadLock readLock; + + // Pre-computed list of user-limits. + Map> + preComputedActiveUserLimit = new ConcurrentHashMap<>(); + Map> + preComputedAllUserLimit = new ConcurrentHashMap<>(); + + public UsersManager(QueueMetrics metrics, LeafQueue lQueue, + RMNodeLabelsManager labelManager) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.activeUsersManager = new ActiveUsersManager(metrics); + this.lQueue = lQueue; + this.labelManager = labelManager; + this.qUsageRatios = new UsageRatios(); + + this.writeLock = lock.writeLock(); + this.readLock = lock.readLock(); + } + + public int getUserLimit() { + return userLimit; + } + + public void setUserLimit(int userLimit) { + this.userLimit = userLimit; + } + + public float getUserLimitFactor() { + return userLimitFactor; + } + + public void setUserLimitFactor(float userLimitFactor) { + this.userLimitFactor = userLimitFactor; + } + + @VisibleForTesting + public float getUsageRatio(String label) { + return qUsageRatios.getUsageRatio(label); + } + + /** + * Get Active User Manager + * @return activeUserManager instance. + */ + public ActiveUsersManager getActiveUsersManager() { + return activeUsersManager; + } + + /** + * Get latest users state + * @return get cached user count to check whether userlimit has to be + * invalidated. + */ + public long getLatestVersionOfUsersState() { + return latestVersionOfUsersState.get(); + } + + /** + * Force UsersManager to recompute userlimit. + */ + public void userLimitNeedsRecompute() { + this.latestVersionOfUsersState.incrementAndGet(); + } + + /** + * Get all users of queue. + * + * @return users + */ + public Map getUsers() { + return users; + } + + /** + * Get user object for given user name. + * + * @param userName + * User Name + * @return User object + */ + public User getUser(String userName) { + return users.get(userName); + } + + /** + * Remove user. + * + * @param userName + * User Name + */ + public void removeUser(String userName) { + this.users.remove(userName); + } + + /** + * Get and add user if absent. + * + * @param userName + * User Name + * @return User object + */ + public User getUserAndAddIfAbsent(String userName) { + try { + writeLock.lock(); + User u = getUser(userName); + if (null == u) { + u = new User(getActiveUsersManager(), userName); + addUser(userName, u); + } + return u; + } finally { + writeLock.unlock(); + } + } + + /* + * Add a new user + */ + private void addUser(String userName, User user) { + this.users.put(userName, user); + } + + /** + * @return an ArrayList of UserInfo objects who are active in this queue + */ + public ArrayList getUsersInfo() { + try { + readLock.lock(); + ArrayList usersToReturn = new ArrayList(); + for (Map.Entry entry : getUsers().entrySet()) { + User user = entry.getValue(); + usersToReturn.add( + new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), + user.getActiveApplications(), user.getPendingApplications(), + Resources.clone(user.getConsumedAMResources()), + Resources.clone(user.getUserResourceLimit()), + user.getResourceUsage())); + } + return usersToReturn; + } finally { + readLock.unlock(); + } + } + + /** + * Get computed user-limit for all ACTIVE users in this queue. If cached data + * is invalidated due to resource change, this method also enforce to + * recompute user-limit. + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @param rc + * Resource Calculator + * @return Computed User Limit + */ + public Resource getComputedResourceLimitForActiveUsers(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode, ResourceCalculator rc) { + + // In allocation thread, cross check whether user became active or not. + // Such a change needs recomputation for all user-limits. + boolean isUsedAddedOrRemoved = activeUsersManager + .isUserAddedOrRemoved(false); + if (isUsedAddedOrRemoved) { + // Clear computed user-limits and ask for recomputation. + userLimitNeedsRecompute(); + } + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = preComputedActiveUserLimit + .get(userRecord); + + try { + writeLock.lock(); + long latestVersionOfUserCount = getLatestVersionOfUsersState(); + + if (isRecomputeNeeded(user, userLimitPerSchedulingMode, + latestVersionOfUserCount, schedulingMode)) { + // recompute + userLimitPerSchedulingMode = reComputeUserLimits(rc, userName, + nodePartition, clusterResource, schedulingMode, true); + + // update user count to cache so that we can avoid recompute if no major + // changes. + user.setLocalVersionOfUsersState(latestVersionOfUserCount); + } + } finally { + writeLock.unlock(); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /** + * Get computed user-limit for all users in this queue. If cached data is + * invalidated due to resource change, this method also enforce to recompute + * user-limit. + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @param rc + * Resource Calculator + * @return Computed User Limit + */ + public Resource getComputedResourceLimitForAllUsers(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode, ResourceCalculator rc) { + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = preComputedAllUserLimit + .get(userRecord); + + try { + writeLock.lock(); + long latestVersionOfUserCount = getLatestVersionOfUsersState(); + + if (isRecomputeNeeded(user, userLimitPerSchedulingMode, + latestVersionOfUserCount, schedulingMode)) { + // recompute + userLimitPerSchedulingMode = reComputeUserLimits(rc, userName, + nodePartition, clusterResource, schedulingMode, false); + + // update user count to cache so that we can avoid recompute if no major + // changes. + user.setLocalVersionOfUsersState(latestVersionOfUserCount); + } + } finally { + writeLock.unlock(); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /* + * Recompute user-limit under following conditions: 1. cached user-limit does + * not exist in local map. 2. Total User count doesn't match with local cached + * version. 3. If given schedulingMode's user-limit is not computed. + */ + private boolean isRecomputeNeeded(User user, + Map userLimitPerSchedulingMode, + long latestVersionOfUserCount, SchedulingMode schedulingMode) { + return userLimitPerSchedulingMode == null + || user.getLocalVersionOfUsersState() != latestVersionOfUserCount + || !userLimitPerSchedulingMode.containsKey(schedulingMode); + } + + private Map reComputeUserLimits( + ResourceCalculator rc, String userName, String nodePartition, + Resource clusterResource, SchedulingMode schedulingMode, + boolean activeMode) { + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map> computedMap = null; + + // preselect stored map as per active user-limit or all user computation. + computedMap = (activeMode == true) + ? preComputedActiveUserLimit + : preComputedAllUserLimit; + + Map userLimitPerSchedulingMode = computedMap + .get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = new ConcurrentHashMap<>(); + computedMap.put(userRecord, userLimitPerSchedulingMode); + } + + userLimitPerSchedulingMode.clear(); + + // compute user-limit per scheduling mode. + Resource userLimit = computeUserLimit(rc, userName, clusterResource, + lQueue.getUser(userName), nodePartition, schedulingMode, activeMode); + + // update in local storage + userLimitPerSchedulingMode.put(schedulingMode, userLimit); + + return userLimitPerSchedulingMode; + } + + @Lock(NoLock.class) + private Resource computeUserLimit(ResourceCalculator resourceCalculator, + String userName, Resource clusterResource, User user, + String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { + Resource partitionResource = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // What is our current capacity? + // * It is equal to the max(required, queue-capacity) if + // we're running below capacity. The 'max' ensures that jobs in queues + // with miniscule capacity (< 1 slot) make progress + // * If we're running over capacity, then its + // (usedResources + required) (which extra resources we are allocating) + Resource queueCapacity = Resources.multiplyAndNormalizeUp( + resourceCalculator, partitionResource, + lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition), + lQueue.getMinimumAllocation()); + + // Assume we have required resource equals to minimumAllocation, this can + // make sure user limit can continuously increase till queueMaxResource + // reached. + Resource required = lQueue.getMinimumAllocation(); + + // Allow progress for queues with miniscule capacity + queueCapacity = Resources.max(resourceCalculator, partitionResource, + queueCapacity, required); + + /* + * We want to base the userLimit calculation on max(queueCapacity, + * usedResources+required). However, we want usedResources to be based on + * the combined ratios of all the users in the queue so we use consumedRatio + * to calculate such. The calculation is dependent on how the + * resourceCalculator calculates the ratio between two Resources. DRF + * Example: If usedResources is greater than queueCapacity and users have + * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is + * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio + * is then 20+30=50%. Yes, this value can be larger than 100% but for the + * purposes of making sure all users are getting their fair share, it works. + */ + Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, + partitionResource, getUsageRatio(nodePartition), + lQueue.getMinimumAllocation()); + Resource currentCapacity = Resources.lessThan(resourceCalculator, + partitionResource, consumed, queueCapacity) + ? queueCapacity + : Resources.add(consumed, required); + // Never allow a single user to take more than the + // queue's configured capacity * user-limit-factor. + // Also, the queue's configured capacity should be higher than + // queue-hard-limit * ulMin + + Resource resourceUsed = Resources.createResource(0, 0); + int activeUsersCount = activeUsersManager.getNumActiveUsers(); + + // For activeUser calculation, consider only active users resource used. + if (activeUser) { + resourceUsed = activeUsersManager.getTotalResUsedByActiveUsers() + .getUsed(nodePartition); + } else { + resourceUsed = currentCapacity; + } + + // User limit resource is determined by: + // max{currentCapacity / #activeUsers, currentCapacity * + // user-limit-percentage%) + Resource userLimitResource = Resources.max(resourceCalculator, + partitionResource, + Resources.divideAndCeil(resourceCalculator, resourceUsed, + activeUsersCount), + Resources.divideAndCeil(resourceCalculator, + Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()), + 100)); + + // User limit is capped by maxUserLimit + // - maxUserLimit = queueCapacity * user-limit-factor + // (RESPECT_PARTITION_EXCLUSIVITY) + // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) + // + // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a + // partition, its guaranteed resource on that partition is 0. And + // user-limit-factor computation is based on queue's guaranteed capacity. So + // we will not cap user-limit as well as used resource when doing + // IGNORE_PARTITION_EXCLUSIVITY allocation. + Resource maxUserLimit = Resources.none(); + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, + getUserLimitFactor()); + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + maxUserLimit = partitionResource; + } + + // Cap final user limit with maxUserLimit + userLimitResource = Resources + .roundUp(resourceCalculator, + Resources.min(resourceCalculator, partitionResource, + userLimitResource, maxUserLimit), + lQueue.getMinimumAllocation()); + + if (LOG.isDebugEnabled()) { + LOG.debug("User limit computation for " + userName + " in queue " + + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit() + + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: " + + required + " consumed: " + consumed + " user-limit-resource: " + + userLimitResource + " queueCapacity: " + queueCapacity + + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed() + + " currentCapacity: " + currentCapacity + " activeUsers: " + + activeUsersCount + " clusterCapacity: " + clusterResource + + " resourceByLabel: " + partitionResource + " usageratio: " + + getUsageRatio(nodePartition) + " Partition: " + nodePartition); + } + user.setUserResourceLimit(userLimitResource); + return userLimitResource; + } + + private float calculateUserUsageRatio(Resource clusterResource, + String nodePartition) { + try { + writeLock.lock(); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + float consumed = 0; + User user; + for (Map.Entry entry : getUsers().entrySet()) { + user = entry.getValue(); + consumed += user.setAndUpdateUsageRatio(lQueue.resourceCalculator, + resourceByLabel, nodePartition); + } + return consumed; + } finally { + writeLock.unlock(); + } + } + + /** + * Set new usage ratio. + * + * @param partition + * Node partition + * @param clusterResource + * Cluster Resource + */ + public void setUsageRatio(String partition, Resource clusterResource) { + qUsageRatios.setUsageRatio(partition, + calculateUserUsageRatio(clusterResource, partition)); + } + + /** + * Update Queue Usage Ratio. + * + * @param nodePartition + * Node Partition + * @param delta + * Delta to increment + */ + public void updateQueueUsageRatio(String nodePartition, float delta) { + qUsageRatios.incUsageRatio(nodePartition, delta); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index e0ac56f..7dcdf58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -63,7 +63,7 @@ maxApplications = q.getMaxApplications(); maxApplicationsPerUser = q.getMaxApplicationsPerUser(); userLimit = q.getUserLimit(); - users = new UsersInfo(q.getUsers()); + users = new UsersInfo(q.getUsersManager().getUsersInfo()); userLimitFactor = q.getUserLimitFactor(); AMResourceLimit = new ResourceInfo(q.getAMResourceLimit()); usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a09a33c..9fa344c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -359,9 +361,10 @@ private void mockApplications(String appsConfig) { queue.getQueueCapacities().getAbsoluteCapacity()); HashSet users = userMap.get(queue.getQueueName()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String user : users) { - when(queue.getUserLimitPerUser(eq(user), any(Resource.class), - anyString())).thenReturn(userLimit); + for (String userName : users) { + when(queue.getResourceLimitForActiveUsers(eq(userName), + any(Resource.class), any(User.class), anyString(), + any(SchedulingMode.class))).thenReturn(userLimit); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 0a864fd..732b5d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -184,7 +184,7 @@ private void checkUserUsedResource(MockRM rm, String queueName, String userName, String partition, int memory) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName); - LeafQueue.User user = queue.getUser(userName); + UsersManager.User user = queue.getUser(userName); Assert.assertEquals(memory, user.getResourceUsage().getUsed(partition).getMemorySize()); } @@ -241,7 +241,7 @@ public RMNodeLabelsManager createNodeLabelManager() { LeafQueue queue = (LeafQueue) ((CapacityScheduler) rm.getResourceScheduler()) .getQueue("a"); - ArrayList users = queue.getUsers(); + ArrayList users = queue.getUsersManager().getUsersInfo(); for (UserInfo userInfo : users) { if (userInfo.getUsername().equals("user")) { ResourceInfo resourcesUsed = userInfo.getResourcesUsed(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f1396b6..c69cb88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -77,10 +77,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -525,19 +523,22 @@ public void testSingleQueueWithOneUser() throws Exception { // Users final String user_0 = "user_0"; + // Active Users Manager + ActiveUsersManager activeUserManager = a.getActiveUsersManager(); + // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext); + activeUserManager, spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext); + activeUserManager, spyRMContext); a.submitApplicationAttempt(app_1, user_0); // same user @@ -836,7 +837,7 @@ public void testDRFUserLimits() throws Exception { / (numNodes * 100.0f) + queueUser1.getUsed().getMemorySize() / (numNodes * 8.0f * GB); - assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001); // Add another node and make sure consumedRatio is adjusted // accordingly. numNodes = 3; @@ -850,7 +851,7 @@ public void testDRFUserLimits() throws Exception { / (numNodes * 100.0f) + queueUser1.getUsed().getMemorySize() / (numNodes * 8.0f * GB); - assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001); } @Test -- 2.10.1 (Apple Git-78)