diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index d3e2f8981a8..de1d2493d92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -112,8 +112,13 @@ public int compare(CSQueue q1, CSQueue q2) { return rc; } - float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); - float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + PriorityQueueResourcesForSorting q1Sort = + new PriorityQueueResourcesForSorting(q1, p); + PriorityQueueResourcesForSorting q2Sort = + new PriorityQueueResourcesForSorting(q2, p); + + float q1AbsCapacity = q1Sort.absoluteCapacity; + float q2AbsCapacity = q2Sort.absoluteCapacity; //If q1's abs capacity > 0 and q2 is 0, then prioritize q1 if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity, @@ -127,28 +132,31 @@ public int compare(CSQueue q1, CSQueue q2) { q2AbsCapacity, 0f) == 0) { // both q1 has 0 and q2 has 0 capacity, then fall back to using // priority, abs used capacity to prioritize - float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p); - float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); + float used1 = q1Sort.absoluteUsedCapacity; + float used2 = q2Sort.absoluteUsedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1.getPriority().getPriority(), q2.getPriority().getPriority()); } else{ // both q1 has positive abs capacity and q2 has positive abs // capacity - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); + float used1 = q1Sort.usedCapacity; + float used2 = q2Sort.usedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1.getPriority().getPriority(), q2.getPriority().getPriority()); } } - private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, - String partition) { + private int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort, float q1Used, + float q2Used, int q1Prior, int q2Prior) { int p1 = 0; int p2 = 0; if (respectPriority) { - p1 = q1.getPriority().getPriority(); - p2 = q2.getPriority().getPriority(); + p1 = q1Prior; + p2 = q2Prior; } int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used, @@ -158,16 +166,16 @@ private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, // capacity goes first if (0 == rc) { Resource minEffRes1 = - q1.getQueueResourceQuotas().getConfiguredMinResource(partition); + q1Sort.configuredMinResource; Resource minEffRes2 = - q2.getQueueResourceQuotas().getConfiguredMinResource(partition); + q2Sort.configuredMinResource; if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals( Resources.none())) { return minEffRes2.compareTo(minEffRes1); } - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition); + float abs1 = q1Sort.absoluteCapacity; + float abs2 = q2Sort.absoluteCapacity; return Float.compare(abs2, abs1); } @@ -203,6 +211,32 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, } } + // A internal class to take a snapshot of queue capacities related + // for safe sorting which doesn't need hold scheduler's lock + // The partition is thread local so just get it. + private class PriorityQueueResourcesForSorting { + private String partition; + private float absoluteUsedCapacity; + private float usedCapacity; + private Resource configuredMinResource; + private float absoluteCapacity; + private CSQueue refQueue; + + PriorityQueueResourcesForSorting(CSQueue queue, String partition) { + this.refQueue = queue; + this.partition = partition; + this.absoluteUsedCapacity = + queue.getQueueCapacities().getAbsoluteUsedCapacity(partition); + this.usedCapacity = + queue.getQueueCapacities().getUsedCapacity(partition); + this.absoluteCapacity = + queue.getQueueCapacities().getAbsoluteCapacity(partition); + this.configuredMinResource = + queue.getQueueResourceQuotas().getConfiguredMinResource(partition); + } + + } + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { this.respectPriority = respectPriority; }