diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index abcb1a2..a72df65 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -113,11 +113,18 @@ * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; + /** + * Only consider a queue's request for resources if the current + pending is + * greater than a configurable percentage of the current capacity. + */ + public static final String MAX_PENDING_OVER_CAPACITY = + "yarn.resourcemanager.monitor.capacity.preemption.max_pending_over_capacity"; private RMContext rmContext; private final Clock clock; private double maxIgnoredOverCapacity; + private double maxPendingOverCapacity; private long maxWaitTime; private CapacityScheduler scheduler; private long monitoringInterval; @@ -156,6 +163,7 @@ public void init(Configuration config, RMContext context, rmContext = context; scheduler = (CapacityScheduler) sched; maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1); + maxPendingOverCapacity = config.getDouble(MAX_PENDING_OVER_CAPACITY, 0.1); naturalTerminationFactor = config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2); maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000); @@ -757,7 +765,17 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { } if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; - Resource pending = l.getTotalResourcePending(); + Resource pending = l.getTotalResourcePending(clusterResources); + // Only consider a queue's request for resources if the + // current + pending is greater than a configurable percentage of the + // current capacity. + if (!Resources.equals(pending, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResources, + Resources.add(current, pending), + Resources.multiply(current, 1.0 + maxPendingOverCapacity))) { + pending = Resource.newInstance(0,0); + } + } ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity, preemptionDisabled); if (preemptionDisabled) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 59b9d21..5c28187 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1826,12 +1826,40 @@ public void recoverContainer(Resource clusterResource, } // return a single Resource capturing the overal amount of pending resources - public synchronized Resource getTotalResourcePending() { - Resource ret = BuilderUtils.newResource(0, 0); + // Consider the headroom for each user in the queue. + // Total pending for the queue = + // sum for each user(min( (user's headroom), sum(user's pending requests) )) + public synchronized Resource getTotalResourcePending(Resource resources) { + Map> appsPerUser = + new HashMap>(); for (FiCaSchedulerApp f : activeApplications) { - Resources.addTo(ret, f.getTotalPendingRequests()); + String userName = f.getUser(); + HashSet apps = appsPerUser.get(userName); + if (apps == null) { + apps = new HashSet(); + appsPerUser.put(userName, apps); + } + apps.add(f); + } + + Resource queuePending = Resource.newInstance(0, 0); + for (String userName : appsPerUser.keySet()) { + Resource userHeadroom = null; + Resource userPending = Resource.newInstance(0, 0); + for (FiCaSchedulerApp f : appsPerUser.get(userName)) { + // Get headroom for this user the first time through this loop. + if (userHeadroom == null) { + userHeadroom = f.getHeadroom(); + } + Resources.addTo(userPending, f.getTotalPendingRequests()); + } + // Make sure headroom is not negative. + userHeadroom = Resources.componentwiseMax( + userHeadroom, Resources.none()); + Resources.addTo(queuePending, + Resources.componentwiseMin(userHeadroom, userPending)); } - return ret; + return queuePending; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8e9545d..abb69e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -90,6 +90,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import com.google.inject.matcher.Matchers; + public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; @@ -1032,7 +1034,7 @@ LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, LeafQueue lq = mock(LeafQueue.class); List appAttemptIdList = new ArrayList(); - when(lq.getTotalResourcePending()).thenReturn( + when(lq.getTotalResourcePending(isA(Resource.class))).thenReturn( Resource.newInstance(pending[i], 0)); // consider moving where CapacityScheduler::comparator accessible NavigableSet qApps = new TreeSet( @@ -1145,7 +1147,8 @@ void printString(CSQueue nq, String indent) { } } else { System.out.println(indent + nq.getQueueName() - + " pen:" + ((LeafQueue) nq).getTotalResourcePending() + + " pen:" + ((LeafQueue) nq) + .getTotalResourcePending(isA(Resource.class)) + " cur:" + nq.getAbsoluteUsedCapacity() + " guar:" + nq.getAbsoluteCapacity() );