diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index d3e2f8981a8..2becd18e710 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -20,6 +20,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels .RMNodeLabelsManager; @@ -34,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * For two queues with the same priority: @@ -101,19 +103,20 @@ public static int compare(double relativeAssigned1, double relativeAssigned2, /** * Comparator that both looks at priority and utilization */ - private class PriorityQueueComparator implements Comparator { + private class PriorityQueueComparator implements Comparator { @Override - public int compare(CSQueue q1, CSQueue q2) { + public int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort) { String p = partitionToLookAt.get(); - int rc = compareQueueAccessToPartition(q1, q2, p); + int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p); if (0 != rc) { return rc; } - float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); - float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + float q1AbsCapacity = q1Sort.absoluteCapacity; + float q2AbsCapacity = q2Sort.absoluteCapacity; //If q1's abs capacity > 0 and q2 is 0, then prioritize q1 if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity, @@ -127,28 +130,32 @@ public int compare(CSQueue q1, CSQueue q2) { q2AbsCapacity, 0f) == 0) { // both q1 has 0 and q2 has 0 capacity, then fall back to using // priority, abs used capacity to prioritize - float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p); - float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); + float used1 = q1Sort.absoluteUsedCapacity; + float used2 = q2Sort.absoluteUsedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1Sort.queue.getPriority().getPriority(), q2Sort.queue.getPriority().getPriority()); } else{ // both q1 has positive abs capacity and q2 has positive abs // capacity - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); + float used1 = q1Sort.usedCapacity; + float used2 = q2Sort.usedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1Sort.queue.getPriority().getPriority(), + q2Sort.queue.getPriority().getPriority()); } } - private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, - String partition) { + private int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort, float q1Used, + float q2Used, int q1Prior, int q2Prior) { int p1 = 0; int p2 = 0; if (respectPriority) { - p1 = q1.getPriority().getPriority(); - p2 = q2.getPriority().getPriority(); + p1 = q1Prior; + p2 = q2Prior; } int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used, @@ -158,16 +165,16 @@ private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, // capacity goes first if (0 == rc) { Resource minEffRes1 = - q1.getQueueResourceQuotas().getConfiguredMinResource(partition); + q1Sort.configuredMinResource; Resource minEffRes2 = - q2.getQueueResourceQuotas().getConfiguredMinResource(partition); + q2Sort.configuredMinResource; if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals( Resources.none())) { return minEffRes2.compareTo(minEffRes1); } - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition); + float abs1 = q1Sort.absoluteCapacity; + float abs2 = q2Sort.absoluteCapacity; return Float.compare(abs2, abs1); } @@ -203,6 +210,33 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, } } + // A internal class to take a snapshot of queue capacities related + // for safe sorting which doesn't need hold scheduler's lock + // The partition is thread local so just get it. + public static class PriorityQueueResourcesForSorting { + private float absoluteUsedCapacity; + private float usedCapacity; + private Resource configuredMinResource; + private float absoluteCapacity; + private CSQueue queue; + + PriorityQueueResourcesForSorting(CSQueue queue) { + this.queue = queue; + this.absoluteUsedCapacity = + queue.getQueueCapacities().getAbsoluteUsedCapacity(partitionToLookAt.get()); + this.usedCapacity = + queue.getQueueCapacities().getUsedCapacity(partitionToLookAt.get()); + this.absoluteCapacity = + queue.getQueueCapacities().getAbsoluteCapacity(partitionToLookAt.get()); + this.configuredMinResource = + queue.getQueueResourceQuotas().getConfiguredMinResource(partitionToLookAt.get()); + } + + public CSQueue getQueue() { + return queue; + } + } + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { this.respectPriority = respectPriority; } @@ -217,11 +251,134 @@ public void setQueues(List queues) { // Since partitionToLookAt is a thread local variable, and every time we // copy and sort queues, so it's safe for multi-threading environment. PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); + + // Take snapshots of ParentQueue's PriorityQueueResourcesForSorting + List sortedQueueResources = queues.stream(). + map(queue -> new PriorityQueueResourcesForSorting(queue)). + collect(Collectors.toList()); + + Collections.sort(sortedQueueResources, new PriorityQueueComparator()); + // Return the reference queue iterator + return sortedQueueResources.stream(). + map(sorting -> sorting.getQueue()). + collect(Collectors.toList()).iterator(); + } + + // Just for test performance side effect/regression + @Deprecated + public Iterator getOldAssignmentIterator(String partition) { + // Since partitionToLookAt is a thread local variable, and every time we + // copy and sort queues, so it's safe for multi-threading environment. + PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); List sortedQueue = new ArrayList<>(queues); - Collections.sort(sortedQueue, new PriorityQueueComparator()); + Collections.sort(sortedQueue, new PriorityQueueComparatorOld()); return sortedQueue.iterator(); } + // Just for test performance side effect/regression + @Deprecated + private class PriorityQueueComparatorOld implements Comparator { + + @Override + public int compare(CSQueue q1, CSQueue q2) { + String p = partitionToLookAt.get(); + + int rc = compareQueueAccessToPartition(q1, q2, p); + if (0 != rc) { + return rc; + } + + float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); + float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + + //If q1's abs capacity > 0 and q2 is 0, then prioritize q1 + if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity, + 0f) == 0) { + return -1; + //If q2's abs capacity > 0 and q1 is 0, then prioritize q2 + } else if (Float.compare(q2AbsCapacity, 0f) > 0 && Float.compare( + q1AbsCapacity, 0f) == 0) { + return 1; + } else if (Float.compare(q1AbsCapacity, 0f) == 0 && Float.compare( + q2AbsCapacity, 0f) == 0) { + // both q1 has 0 and q2 has 0 capacity, then fall back to using + // priority, abs used capacity to prioritize + float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p); + float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); + + return compare(q1, q2, used1, used2, p); + } else{ + // both q1 has positive abs capacity and q2 has positive abs + // capacity + float used1 = q1.getQueueCapacities().getUsedCapacity(p); + float used2 = q2.getQueueCapacities().getUsedCapacity(p); + + return compare(q1, q2, used1, used2, p); + } + } + + private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, + String partition) { + + int p1 = 0; + int p2 = 0; + if (respectPriority) { + p1 = q1.getPriority().getPriority(); + p2 = q2.getPriority().getPriority(); + } + + int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used, + p1, p2); + + // For queue with same used ratio / priority, queue with higher configured + // capacity goes first + if (0 == rc) { + Resource minEffRes1 = + q1.getQueueResourceQuotas().getConfiguredMinResource(partition); + Resource minEffRes2 = + q2.getQueueResourceQuotas().getConfiguredMinResource(partition); + if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals( + Resources.none())) { + return minEffRes2.compareTo(minEffRes1); + } + + float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition); + float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition); + return Float.compare(abs2, abs1); + } + + return rc; + } + + private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, + String partition) { + // Everybody has access to default partition + if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) { + return 0; + } + + /* + * Check accessible to given partition, if one queue accessible and + * the other not, accessible queue goes first. + */ + boolean q1Accessible = + q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels() + .contains(partition) || q1.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + boolean q2Accessible = + q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels() + .contains(partition) || q2.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY); + if (q1Accessible && !q2Accessible) { + return -1; + } else if (!q1Accessible && q2Accessible) { + return 1; + } + + return 0; + } + } + @Override public String getConfigName() { if (respectPriority) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java index 5d68482929e..ef4def3309e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java @@ -20,7 +20,9 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableTable; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; @@ -30,11 +32,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Random; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestPriorityUtilizationQueueOrderingPolicy { + private final static Random random = new Random(System.currentTimeMillis()); + private List mockCSQueues(String[] queueNames, int[] priorities, float[] utilizations, float[] absCapacities, String partition) { // sanity check @@ -55,6 +60,8 @@ when(q.getPriority()).thenReturn(Priority.newInstance(priorities[i])); QueueResourceQuotas qr = new QueueResourceQuotas(); + qr.setConfiguredMinResource(Resource. + newInstance(random.nextInt(1000), random.nextInt(1000))); when(q.getQueueResourceQuotas()).thenReturn(qr); list.add(q); } @@ -250,4 +257,66 @@ public void testPriorityUtilizationOrdering() { verifyOrder(policy, "x", new String[] { "e", "c", "d", "b", "a" }); } + + // This is test side effect for YARN-10178 + @Test + public void testPriorityQueueResourcesForSortingPerformance(){ + PriorityUtilizationQueueOrderingPolicy policy = + new PriorityUtilizationQueueOrderingPolicy(true); + + String[] queueNames = new String[1000]; + int[] priorities = new int[1000]; + float[] utilizations = new float[1000]; + float[] absCapacities = new float[1000]; + String partition = "test"; + + // test mock 1000 queues for performance test + for (int i = 0; i < 1000; ++i) { + queueNames[i] = "queue" + i; + priorities[i] = i % 10; + utilizations[i] = random.nextFloat(); + absCapacities[i] = random.nextFloat(); + } + + policy.setQueues(mockCSQueues(queueNames, priorities, utilizations, absCapacities, partition)); + + long startOld = Time.monotonicNow(); + policy.getOldAssignmentIterator(partition); + long durationOld = Time.monotonicNow() - startOld; + + long start = Time.monotonicNow(); + policy.getAssignmentIterator(partition); + long duration = Time.monotonicNow() - start; + + // 1000 queues the time gap less than 1s + Assert.assertEquals(duration, durationOld, 1000); + + // test mock 10000 queues for performance test + String[] queueNamesMore = new String[10000]; + int[] prioritiesMore = new int[10000]; + float[] utilizationsMore = new float[10000]; + float[] absCapacitiesMore = new float[10000]; + + // test mock 1000 queues for performance test + for (int i = 0; i < 10000; ++i) { + queueNamesMore[i] = "queue" + i; + prioritiesMore[i] = i % 10; + utilizationsMore[i] = random.nextFloat(); + absCapacitiesMore[i] = random.nextFloat(); + } + + policy.setQueues(mockCSQueues(queueNamesMore, prioritiesMore, utilizationsMore, absCapacitiesMore, partition)); + + long startOldMore = Time.monotonicNow(); + policy.getOldAssignmentIterator(partition); + long durationOldMore = Time.monotonicNow() - startOldMore; + + long startMore = Time.monotonicNow(); + policy.getAssignmentIterator(partition); + long durationMore = Time.monotonicNow() - startMore; + + // 10000 queues the time gap less than 10s + Assert.assertEquals(durationMore, durationOldMore, 10000); + + } }