From 53e41caf08df508ae51913413497a8e1f3fe37a3 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 5 Jan 2017 17:29:22 +0530 Subject: [PATCH] YARN-5889 --- .../resourcemanager/UserToPartitionRecord.java | 86 +++++++++ .../capacity/FifoIntraQueuePreemptionPlugin.java | 6 +- .../scheduler/ActiveUsersManager.java | 47 ++++- .../scheduler/capacity/LeafQueue.java | 200 ++++++++++++++++++--- ...ionalCapacityPreemptionPolicyMockFramework.java | 10 +- 5 files changed, 318 insertions(+), 31 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java new file mode 100644 index 0000000..a2b6344 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/UserToPartitionRecord.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +public class UserToPartitionRecord + implements + Comparable { + + private String userName; + private String partition; + + public static UserToPartitionRecord newInstance(String userName, + String partition) { + UserToPartitionRecord userRecord = new UserToPartitionRecord(); + userRecord.setPartition(partition); + userRecord.setUserName(userName); + return userRecord; + } + + public String getPartition() { + return partition; + } + + public String getUserName() { + return userName; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + @Override + public int compareTo(UserToPartitionRecord o) { + int result = this.getUserName().compareTo(o.getUserName()); + if (result == 0) { + return this.getPartition().compareTo(o.getPartition()); + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UserToPartitionRecord other = (UserToPartitionRecord) obj; + if (getUserName() != other.getUserName() + || getPartition() != other.getPartition()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + final int prime = 493217; + int result = 8501; + result = prime * result + this.getUserName().hashCode(); + result = prime * result + this.getPartition().hashCode(); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 757f567..a313465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -264,8 +265,9 @@ private void calculateToBePreemptedResourcePerApp(Resource clusterResource, // Verify whether we already calculated headroom for this user. if (userLimitResource == null) { - userLimitResource = Resources.clone(tq.leafQueue - .getUserLimitPerUser(userName, partitionBasedResource, partition)); + userLimitResource = Resources.clone(tq.leafQueue.getComputedUserLimit( + userName, clusterResource, tq.leafQueue.getUser(userName), + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); Resource amUsed = perUserAMUsed.get(userName); if (null == amUsed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e6858..c146cd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -45,7 +47,8 @@ private int activeUsers = 0; private Map> usersApplications = new HashMap>(); - + private boolean userAddedOrRemoved = false; + public ActiveUsersManager(QueueMetrics metrics) { this.metrics = metrics; } @@ -65,6 +68,7 @@ synchronized public void activateApplication( usersApplications.put(user, userApps); ++activeUsers; metrics.incrActiveUsers(); + resetUserAddedOrRemoved(true); LOG.debug("User " + user + " added to activeUsers, currently: " + activeUsers); } @@ -91,12 +95,13 @@ synchronized public void deactivateApplication( usersApplications.remove(user); --activeUsers; metrics.decrActiveUsers(); + resetUserAddedOrRemoved(true); LOG.debug("User " + user + " removed from activeUsers, currently: " + activeUsers); } } } - + /** * Get number of active users i.e. users with applications which have pending * resource requests. @@ -106,4 +111,42 @@ synchronized public void deactivateApplication( synchronized public int getNumActiveUsers() { return activeUsers; } + + /** + * Get active users i.e. users with applications which have pending + * resource requests. + * @return number of active users + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public Collection getActiveUsers() { + return Collections.unmodifiableCollection(usersApplications.keySet()); + } + + /** + * + * @param user + * User Name + * @return whether given user is active or not. + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public boolean isAnActiveUser(String user) { + return usersApplications.containsKey(user); + } + + /** + * Check whether any user has became active or non-active. + * @return Return true/false. + */ + public boolean isUserAddedOrRemoved() { + return userAddedOrRemoved; + } + + /** + * + * @param reset + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + public void resetUserAddedOrRemoved(boolean reset) { + this.userAddedOrRemoved = reset; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1c6471f..3ee9c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.UserToPartitionRecord; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -139,6 +140,16 @@ private Map> ignorePartitionExclusivityRMContainers = new ConcurrentHashMap<>(); + // Pre-computed list of user-limits. + Map> preComputedActiveUserLimit = + new ConcurrentHashMap<>(); + Map> preComputedUserLimit = + new ConcurrentHashMap<>(); + + // userCount to detect whether there is change in user count for every user-limit + // calculation. + private int lastUserCount = 0; + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -1262,7 +1273,7 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application.getUser(), clusterResource, user, + getComputedActiveUserLimit(application.getUser(), clusterResource, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } @@ -1335,9 +1346,8 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - computeUserLimit(application.getUser(), clusterResource, queueUser, - nodePartition, schedulingMode); + Resource userLimit = getComputedActiveUserLimit(application.getUser(), + clusterResource, queueUser, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1373,10 +1383,141 @@ public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedActiveUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { + + // In allocation thread, cross check whether user became active or not. + // Such a change needs recomputation for all user-limits. + if (activeUsersManager.isUserAddedOrRemoved()) { + // Clear all computed user-limits and ask for recomputation. + preComputedActiveUserLimit.clear(); + + // reset the flag so that we can further track for active users. + activeUsersManager.resetUserAddedOrRemoved(false); + } + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = + preComputedActiveUserLimit.get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, + clusterResource, true); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param user + * user who owns the applications in this queue + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedUserLimit(String userName, + Resource clusterResource, User user, String nodePartition, + SchedulingMode schedulingMode) { + + // In compute thread, cross check whether new user has added or not. + // Such a change needs recomputation for all user-limits. + if (lastUserCount != users.size()) { + // Clear all computed user-limits and ask for recomputation. + preComputedUserLimit.clear(); + + // reset to latest user count. + lastUserCount = users.size(); + } + + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + Map userLimitPerSchedulingMode = + preComputedUserLimit.get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = reComputeUserLimits(userName, nodePartition, + clusterResource, false); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param nodePartition + * partition name + */ + public void invalidateComputedUserLimit(String userName, + String nodePartition) { + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + preComputedActiveUserLimit.remove(userRecord); + preComputedUserLimit.remove(userRecord); + } + + private Map reComputeUserLimits(String userName, + String nodePartition, Resource clusterResource, boolean activeMode) { + UserToPartitionRecord userRecord = UserToPartitionRecord + .newInstance(userName, nodePartition); + + Map userLimitPerSchedulingMode = preComputedActiveUserLimit + .get(userRecord); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = new ConcurrentHashMap<>(); + preComputedActiveUserLimit.put(userRecord, userLimitPerSchedulingMode); + } + + userLimitPerSchedulingMode.clear(); + + // compute user-limit per scheduling mode. + Resource userLimitWithExclusivity = computeUserLimit(userName, + clusterResource, getUser(userName), nodePartition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, activeMode); + Resource userLimitWithOutExclusivity = computeUserLimit(userName, + clusterResource, getUser(userName), nodePartition, + SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY, activeMode); + + // update in local storage + userLimitPerSchedulingMode.put(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + userLimitWithExclusivity); + userLimitPerSchedulingMode.put(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY, + userLimitWithOutExclusivity); + + return userLimitPerSchedulingMode; + } + @Lock(NoLock.class) - private Resource computeUserLimit(String userName, - Resource clusterResource, User user, - String nodePartition, SchedulingMode schedulingMode) { + private Resource computeUserLimit(String userName, Resource clusterResource, + User user, String nodePartition, SchedulingMode schedulingMode, + boolean activeUser) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, clusterResource); @@ -1404,7 +1545,6 @@ private Resource computeUserLimit(String userName, queueCapacity, required); - /* We want to base the userLimit calculation on * max(queueCapacity, usedResources+required). However, we want * usedResources to be based on the combined ratios of all the users in the @@ -1428,16 +1568,28 @@ private Resource computeUserLimit(String userName, // queue's configured capacity * user-limit-factor. // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - - final int activeUsers = activeUsersManager.getNumActiveUsers(); - + + Resource resourceUsed = Resources.createResource(0, 0); + int activeUsersCount = activeUsersManager.getNumActiveUsers(); + + // For activeUser calculation, consider only active users resource used. + if (activeUser) { + Collection activeUsers = activeUsersManager.getActiveUsers(); + for (String activeUserName : activeUsers) { + Resources.addTo(resourceUsed, + getUser(activeUserName).getResourceUsage().getUsed(nodePartition)); + } + } else { + resourceUsed = currentCapacity; + } + // User limit resource is determined by: // max{currentCapacity / #activeUsers, currentCapacity * // user-limit-percentage%) Resource userLimitResource = Resources.max( resourceCalculator, partitionResource, Resources.divideAndCeil( - resourceCalculator, currentCapacity, activeUsers), + resourceCalculator, resourceUsed, activeUsersCount), Resources.divideAndCeil( resourceCalculator, Resources.multiplyAndRoundDown( @@ -1485,7 +1637,7 @@ private Resource computeUserLimit(String userName, " qconsumed: " + queueUsage.getUsed() + " consumedRatio: " + totalUserConsumedRatio + " currentCapacity: " + currentCapacity + - " activeUsers: " + activeUsers + + " activeUsers: " + activeUsersCount + " clusterCapacity: " + clusterResource + " resourceByLabel: " + partitionResource + " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + @@ -1746,6 +1898,9 @@ void allocateResource(Resource clusterResource, user.assignContainer(resource, nodePartition); + // New container is allocated. Invalidate user-limit for given user. + invalidateComputedUserLimit(userName, nodePartition); + // Update usage ratios updateQueueUsageRatio(nodePartition, user.updateUsageRatio(resourceCalculator, resourceByLabel, @@ -1795,6 +1950,9 @@ void releaseResource(Resource clusterResource, User user = getUserAndAddIfAbsent(userName); user.releaseContainer(resource, nodePartition); + // Container is released. Invalidate user-limit. + invalidateComputedUserLimit(userName, nodePartition); + // Update usage ratios updateQueueUsageRatio(nodePartition, user.updateUsageRatio(resourceCalculator, resourceByLabel, @@ -1857,6 +2015,10 @@ public void updateClusterResource(Resource clusterResource, // activate the pending applications if possible activateApplications(); + // In case of any resource change, clear the computed user-limit. + preComputedUserLimit.clear(); + preComputedActiveUserLimit.clear(); + // Update application properties for (FiCaSchedulerApp application : orderingPolicy .getSchedulableEntities()) { @@ -2151,7 +2313,7 @@ public Resource getTotalPendingResourcesConsideringUserLimit( if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app.getUser(), resources, user, partition, + getComputedActiveUserLimit(app.getUser(), resources, user, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. @@ -2173,16 +2335,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit( } - public synchronized Resource getUserLimitPerUser(String userName, - Resource resources, String partition) { - - // Check user resource limit - User user = getUser(userName); - - return computeUserLimit(userName, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - } - @Override public void collectSchedulerApplications( Collection apps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 8663315..2a9496a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -42,8 +42,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -360,9 +362,11 @@ private void mockApplications(String appsConfig) { queue.getQueueCapacities().getAbsoluteCapacity()); HashSet users = userMap.get(queue.getQueueName()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String user : users) { - when(queue.getUserLimitPerUser(eq(user), any(Resource.class), - anyString())).thenReturn(userLimit); + for (String userName : users) { + when(queue.getComputedActiveUserLimit(eq(userName), any(Resource.class), + any(User.class), anyString(), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)) + .thenReturn(userLimit); } } } -- 2.7.4 (Apple Git-66)