diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1f47b5f..1c8fe02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -894,8 +894,10 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue, ret.untouchableExtra = Resource.newInstance(0, 0); } else { ret.untouchableExtra = - Resources.subtractFrom(extra, childrensPreemptable); + Resources.subtract(extra, childrensPreemptable); } + ret.preemptableExtra = Resources.min( + rc, partitionResource, childrensPreemptable, extra); } } addTempQueuePartition(ret); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 6c0ed6c..2d103e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -39,11 +39,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Random; import java.util.StringTokenizer; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -119,6 +122,8 @@ ApplicationId.newInstance(TS, 4), 0); final ApplicationAttemptId appF = ApplicationAttemptId.newInstance( ApplicationId.newInstance(TS, 4), 0); + final ApplicationAttemptId appG = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(TS, 5), 0); final ArgumentCaptor evtCaptor = ArgumentCaptor.forClass(ContainerPreemptEvent.class); @@ -862,6 +867,66 @@ public void testAMResourcePercentForSkippedAMContainers() { verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; } + + @Test + public void testHierarchicalLarge3Levels() { + int[][] qData = new int[][] { + // / A F I + // B C G H J K + // D E + { 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap + { 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used + { 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved + // appA appB appC appD appE appF appG + { 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // XXX note: compensating for rounding error in Resources.multiplyTo + // which is likely triggered since we use small numbers for readability + //run with Logger.getRootLogger().setLevel(Level.DEBUG); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC))); + + Map> queueToPartitions = null; + queueToPartitions = getQueuePartitions(policy, queueToPartitions); + assertEquals(10, queueToPartitions.get("queueE").get("").preemptableExtra.getMemory()); + //2nd level child(E) preempts 10, but parent A has only 9 extra + //check the parent can prempt only the extra from > 2 level child + assertEquals(0, queueToPartitions.get("queueA").get("").untouchableExtra.getMemory()); + assertEquals(9, queueToPartitions.get("queueA").get("").preemptableExtra.getMemory()); + } + + /** + * Gets the queue partitions. + * + * @param policy + * the policy + * @param queueToPartitions + * the queue to partitions + * @return the queue partitions + */ + private + Map> getQueuePartitions( + ProportionalCapacityPreemptionPolicy policy, + Map> queueToPartitions) { + try { + Field queueToPartitionsF = null; + queueToPartitionsF = + ProportionalCapacityPreemptionPolicy.class + .getDeclaredField("queueToPartitions"); + queueToPartitionsF.setAccessible(true); + queueToPartitions = + (Map>) queueToPartitionsF + .get(policy); + } catch (Exception e) { + assertTrue("Unable to fetch the policy queues", false); + } + return queueToPartitions; + } static class IsPreemptionRequestFor extends ArgumentMatcher {