diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 713962b..99b2f2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -115,6 +115,7 @@ public void testIgnore() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 0, 0, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -133,6 +134,7 @@ public void testProportionalPreemption() { int[][] qData = new int[][]{ // / A B C D { 100, 10, 40, 20, 30 }, // abs + { 100, 100, 100, 100, 100 }, // maxCap { 100, 30, 60, 10, 0 }, // used { 45, 20, 5, 20, 0 }, // pending { 0, 0, 0, 0, 0 }, // reserved @@ -144,12 +146,33 @@ public void testProportionalPreemption() { policy.editSchedule(); verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); } + + @Test + public void testMaxCap() { + int[][] qData = new int[][]{ + // / A B C + { 100, 40, 40, 20 }, // abs + { 100, 100, 45, 100 }, // maxCap + { 100, 55, 45, 0 }, // used + { 20, 10, 10, 0 }, // pending + { 0, 0, 0, 0 }, // reserved + { 2, 1, 1, 0 }, // apps + { -1, 1, 1, 0 }, // req granularity + { 3, 0, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // despite the imbalance, since B is at maxCap, do not correct + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + } + @Test public void testPreemptCycle() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -169,6 +192,7 @@ public void testExpireKill() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 0, 60, 40 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -205,6 +229,7 @@ public void testDeadzone() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 39, 43, 21 }, // used { 10, 10, 0, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -224,6 +249,7 @@ public void testOverCapacityImbalance() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 55, 45, 0 }, // used { 20, 10, 10, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -242,6 +268,7 @@ public void testNaturalTermination() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 55, 45, 0 }, // used { 20, 10, 10, 0 }, // pending { 0, 0, 0, 0 }, // reserved @@ -261,6 +288,7 @@ public void testObserveOnly() { int[][] qData = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs + { 100, 100, 100, 100 }, // maxCap { 100, 90, 10, 0 }, // used { 80, 10, 20, 50 }, // pending { 0, 0, 0, 0 }, // reserved @@ -280,6 +308,7 @@ public void testHierarchical() { int[][] qData = new int[][] { // / A B C D E F { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap { 200, 110, 60, 50, 90, 90, 0 }, // used { 10, 0, 0, 0, 10, 0, 10 }, // pending { 0, 0, 0, 0, 0, 0, 0 }, // reserved @@ -295,10 +324,54 @@ public void testHierarchical() { } @Test + public void testZeroGuar() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 0, 99, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 170, 80, 60, 20, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from A1, not B1 despite B1 being far over + // its absolute guaranteed capacity + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test + public void testZeroGuarOverCap() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 0, 99, 0, 100, 100 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 170, 170, 60, 20, 90, 0, 0 }, // used + { 85, 50, 30, 10, 10, 20, 20 }, // pending + { 0, 0, 0, 0, 0, 0, 0 }, // reserved + { 4, 3, 1, 1, 1, 1, 1 }, // apps + { -1, -1, 1, 1, 1, -1, 1 }, // req granularity + { 2, 3, 0, 0, 0, 1, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // we verify both that C has priority on B and D (has it has >0 guarantees) + // and that B and D are force to share their over capacity fairly (as they + // are both zero-guarantees) hence D sees some of its containers preempted + verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + } + + + + @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { // / A B C D E F G H I - { 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap { 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used { 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved @@ -382,24 +455,25 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { when(mCS.getRootQueue()).thenReturn(mRoot); Resource clusterResources = - Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0); + Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResources()).thenReturn(clusterResources); return policy; } ParentQueue buildMockRootQueue(Random r, int[]... queueData) { int[] abs = queueData[0]; - int[] used = queueData[1]; - int[] pending = queueData[2]; - int[] reserved = queueData[3]; - int[] apps = queueData[4]; - int[] gran = queueData[5]; - int[] queues = queueData[6]; - - return mockNested(abs, used, pending, reserved, apps, gran, queues); + int[] maxCap = queueData[1]; + int[] used = queueData[2]; + int[] pending = queueData[3]; + int[] reserved = queueData[4]; + int[] apps = queueData[5]; + int[] gran = queueData[6]; + int[] queues = queueData[7]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); } - ParentQueue mockNested(int[] abs, int[] used, + ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { float tot = leafAbsCapacities(abs, queues); Deque pqs = new LinkedList(); @@ -407,6 +481,8 @@ ParentQueue mockNested(int[] abs, int[] used, when(root.getQueueName()).thenReturn("/"); when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); + when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + for (int i = 1; i < queues.length; ++i) { final CSQueue q; final ParentQueue p = pqs.removeLast(); @@ -420,6 +496,7 @@ ParentQueue mockNested(int[] abs, int[] used, when(q.getQueueName()).thenReturn(queueName); when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); + when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); } assert 0 == pqs.size(); return root; @@ -439,7 +516,7 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues, return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, + LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { LeafQueue lq = mock(LeafQueue.class); when(lq.getTotalResourcePending()).thenReturn(