diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index 054438765e7..204cba9dfae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -107,23 +107,32 @@ public int compare(CSQueue q1, CSQueue q2) { return rc; } - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); - int p1 = 0; - int p2 = 0; - if (respectPriority) { - p1 = q1.getPriority().getPriority(); - p2 = q2.getPriority().getPriority(); - } + float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); + float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + if (q1AbsCapacity > 0 && q2AbsCapacity > 0) { + + float used1 = q1.getQueueCapacities().getUsedCapacity(p); + float used2 = q2.getQueueCapacities().getUsedCapacity(p); + int p1 = 0; + int p2 = 0; + if (respectPriority) { + p1 = q1.getPriority().getPriority(); + p2 = q2.getPriority().getPriority(); + } - rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); + rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); - // For queue with same used ratio / priority, queue with higher configured - // capacity goes first - if (0 == rc) { - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); - return Float.compare(abs2, abs1); + // For queue with same used ratio / priority, queue with higher configured + // capacity goes first + if (0 == rc) { + return Float.compare(q2AbsCapacity, q1AbsCapacity); + } + } else if ( q1AbsCapacity > 0) { + return -1; + } else if ( q2AbsCapacity > 0) { + return 1; + } else { + return 0; } return rc; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java index e3c108a581e..f557567dd67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java @@ -35,7 +35,7 @@ public class TestPriorityUtilizationQueueOrderingPolicy { private List mockCSQueues(String[] queueNames, int[] priorities, - float[] utilizations, String partition) { + float[] utilizations, float[] absCapacities, String partition) { // sanity check assert queueNames != null && priorities != null && utilizations != null && queueNames.length > 0 && queueNames.length == priorities.length @@ -47,6 +47,7 @@ when(q.getQueueName()).thenReturn(queueNames[i]); QueueCapacities qc = new QueueCapacities(false); + qc.setAbsoluteCapacity(partition, absCapacities[i]); qc.setUsedCapacity(partition, utilizations[i]); when(q.getQueueCapacities()).thenReturn(qc); @@ -78,41 +79,45 @@ public void testUtilizationOrdering() { // Case 1, one queue policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 0 }, - new float[] { 0.1f }, "")); + new float[] { 0.1f }, new float[] {0.2f}, "")); verifyOrder(policy, "", new String[] { "a" }); // Case 2, 2 queues policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 0, 0 }, - new float[] { 0.1f, 0.0f }, "")); + new float[] { 0.1f, 0.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 3, 3 queues policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 0, 0, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "")); verifyOrder(policy, "", new String[] { "b", "a", "c" }); // Case 4, 3 queues, ignore priority policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "")); verifyOrder(policy, "", new String[] { "b", "a", "c" }); // Case 5, 3 queues, look at partition (default) policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "x")); verifyOrder(policy, "", new String[] { "a", "b", "c" }); // Case 5, 3 queues, look at partition (x) policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "x")); verifyOrder(policy, "x", new String[] { "b", "a", "c" }); // Case 6, 3 queues, with different accessibility to partition List queues = mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x"); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, "x"); // a can access "x" when(queues.get(0).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y")); // c can access "x" @@ -128,89 +133,93 @@ public void testPriorityUtilizationOrdering() { // Case 1, one queue policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 1 }, - new float[] { 0.1f }, "")); + new float[] { 0.1f }, new float[] {0.2f}, "")); verifyOrder(policy, "", new String[] { "a" }); // Case 2, 2 queues, both under utilized, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.2f, 0.1f }, "")); + new float[] { 0.2f, 0.1f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 3, 2 queues, both over utilized, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 1.1f, 1.2f }, "")); + new float[] { 1.1f, 1.2f },new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 4, 2 queues, one under and one over, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 5, 2 queues, both over utilized, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 1.1f, 1.2f }, "")); + new float[] { 1.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 6, 2 queues, both under utilized, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 0.2f }, "")); + new float[] { 0.1f, 0.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 7, 2 queues, one under utilized and one over utilized, // different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 8, 2 queues, one under utilized and one over utilized, // different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 9, 2 queues, one under utilized and one meet, different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 10, 2 queues, one under utilized and one meet, different priority (2) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 11, 2 queues, one under utilized and one meet, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 12, 2 queues, both meet, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 1.0f, 1.0f }, "")); + new float[] { 1.0f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 13, 5 queues, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "")); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "")); verifyOrder(policy, "", new String[] { "e", "c", "b", "a", "d" }); // Case 14, 5 queues, different priority, partition default; policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); - verifyOrder(policy, "", new String[] { "e", "b", "a", "c", "d" }); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x")); + verifyOrder(policy, "", new String[] { "a", "b", "c", "d", "e" }); // Case 15, 5 queues, different priority, partition x; policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x")); verifyOrder(policy, "x", new String[] { "e", "c", "b", "a", "d" }); // Case 16, 5 queues, different priority, partition x; and different // accessibility List queues = mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x"); // Only a/d has access to x when(queues.get(0).getAccessibleNodeLabels()).thenReturn( ImmutableSet.of("x")); @@ -218,5 +227,16 @@ public void testPriorityUtilizationOrdering() { ImmutableSet.of("x")); policy.setQueues(queues); verifyOrder(policy, "x", new String[] { "a", "d", "e", "c", "b" }); + + // Case 17, 2 queues, one's abs capacity is 0 + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.2f }, new float[] {0.0f, 0.3f}, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 18, 2 queues, one's abs capacity is 0 + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.2f }, new float[] {0.3f, 0.0f}, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + } }