diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 77df059..7f68382 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -872,7 +872,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, if (curQueue instanceof LeafQueue) { LeafQueue l = (LeafQueue) curQueue; Resource pending = - l.getQueueResourceUsage().getPending(partitionToLookAt); + l.getTotalResourcePending(partitionResource, partitionToLookAt); ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, maxCapacity, preemptionDisabled, partitionToLookAt); if (preemptionDisabled) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ff1baff..f171e96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1366,14 +1367,49 @@ public void recoverContainer(Resource clusterResource, return orderingPolicy.getSchedulableEntities(); } - // return a single Resource capturing the overal amount of pending resources - public synchronized Resource getTotalResourcePending() { - Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : - orderingPolicy.getSchedulableEntities()) { - Resources.addTo(ret, f.getTotalPendingRequests()); + // Consider the headroom for each user in the queue. + // Total pending for the queue = + // sum for each user(min( (user's headroom), sum(user's pending requests) )) + public synchronized Resource getTotalResourcePending( + Resource resources, String partition) { + Map> appsPerUser = + new HashMap>(); + for (FiCaSchedulerApp f : getApplications()) { + String userName = f.getUser(); + HashSet apps = appsPerUser.get(userName); + if (apps == null) { + apps = new HashSet(); + appsPerUser.put(userName, apps); + } + apps.add(f); + } + + Resource queuePending = Resource.newInstance(0, 0); + for (String userName : appsPerUser.keySet()) { + Resource userHeadroom = null; + Resource userPending = Resource.newInstance(0, 0); + for (FiCaSchedulerApp f : appsPerUser.get(userName)) { + // Get headroom for this user the first time through this loop. + if (userHeadroom == null) { + Resource queueCurrentLimit; + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + } + User user = getUser(userName); + userHeadroom = getHeadroom(user, queueCurrentLimit, resources, + computeUserLimit(f, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); + } + Resources.addTo(userPending, + f.getAppAttemptResourceUsage().getPending(partition)); + } + // Make sure headroom is not negative. + userHeadroom = Resources.componentwiseMax( + userHeadroom, Resources.none()); + Resources.addTo(queuePending, + Resources.componentwiseMin(userHeadroom, userPending)); } - return ret; + return queuePending; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8d9f48a..3e61b58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1150,7 +1150,8 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, ResourceCalculator rc = mCS.getResourceCalculator(); List appAttemptIdList = new ArrayList(); - when(lq.getTotalResourcePending()).thenReturn(pending[i]); + when(lq.getTotalResourcePending(isA(Resource.class), isA(String.class))) + .thenReturn(pending[i]); // need to set pending resource in resource usage as well ResourceUsage ru = new ResourceUsage(); ru.setPending(pending[i]); @@ -1292,7 +1293,8 @@ void printString(CSQueue nq, String indent) { } } else { System.out.println(indent + nq.getQueueName() - + " pen:" + ((LeafQueue) nq).getTotalResourcePending() + + " pen:" + ((LeafQueue) nq) + .getTotalResourcePending(isA(Resource.class), isA(String.class)) + " cur:" + nq.getAbsoluteUsedCapacity() + " guar:" + nq.getAbsoluteCapacity() );