diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 738f527..c1c3dce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -30,7 +30,6 @@ import java.util.PriorityQueue; import java.util.Set; -import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -527,6 +526,7 @@ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, List skippedAMContainerlist = new ArrayList(); for (TempQueue qT : queues) { + if (qT.preemptionDisabled) continue; // we act only if we are violating balance by more than // maxIgnoredOverCapacity if (Resources.greaterThan(rc, clusterResource, qT.current, @@ -734,6 +734,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { float absUsed = root.getAbsoluteUsedCapacity(); float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); + boolean preemptionDisabled = root.getPreemptionDisabled(); Resource current = Resources.multiply(clusterResources, absUsed); Resource guaranteed = Resources.multiply(clusterResources, absCap); @@ -747,8 +748,8 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); ret = new TempQueue(queueName, current, pending, guaranteed, - maxCapacity); - if (root.getPreemptionDisabled()) { + maxCapacity, preemptionDisabled); + if (preemptionDisabled) { ret.untouchableExtra = extra; } else { ret.preemptableExtra = extra; @@ -757,7 +758,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { } else { Resource pending = Resource.newInstance(0, 0); ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, - maxCapacity); + maxCapacity, false); Resource childrensPreemptable = Resource.newInstance(0, 0); for (CSQueue c : root.getChildQueues()) { TempQueue subq = cloneQueues(c, clusterResources); @@ -816,9 +817,10 @@ public int compare(TempQueue o1, TempQueue o2) { final ArrayList children; LeafQueue leafQueue; + boolean preemptionDisabled; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed, Resource maxCapacity) { + Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) { this.queueName = queueName; this.current = current; this.pending = pending; @@ -831,6 +833,7 @@ public int compare(TempQueue o1, TempQueue o2) { this.children = new ArrayList(); this.untouchableExtra = Resource.newInstance(0, 0); this.preemptableExtra = Resource.newInstance(0, 0); + this.preemptionDisabled = preemptionDisabled; } public void setLeafQueue(LeafQueue l){ @@ -863,9 +866,14 @@ public void addChildren(ArrayList queues) { Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource) { // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource absMaxCapIdealAssignedDelta = Resource.newInstance(0, 0); + if (Resources.greaterThanOrEqual( + rc, clusterResource, maxCapacity, idealAssigned)) { + absMaxCapIdealAssignedDelta = Resources.subtract(maxCapacity, idealAssigned); + } Resource accepted = Resources.min(rc, clusterResource, - Resources.subtract(maxCapacity, idealAssigned), + absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, Resources.subtract( Resources.add(current, pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 696b9bb..8f5237e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -532,6 +532,30 @@ public void testPerQueueDisablePreemptionRootDisablesAll() { } @Test + public void testPerQueueDisablePreemptionOverAbsMaxCapacity() { + int[][] qData = new int[][] { + // / A D + // B C E F + {1000, 725, 360, 365, 275, 17, 258 }, // absCap + {1000,1000,1000,1000, 550, 109,1000 }, // absMaxCap + {1000, 741, 396, 345, 259, 110, 149 }, // used + { 40, 20, 0, 20, 20, 20, 0 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granulrity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + // QueueE inherits non-preemption from QueueD + schedConf.setPreemptionDisabled("root.queueD", true); + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // appC is running on QueueE. QueueE is over absMaxCap, but is not + // preemptable. Therefore, appC resources should not be preempted. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + @Test public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C