diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 42c45ad..192dbcf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -30,7 +30,8 @@ LogFactory.getLog(DefaultResourceCalculator.class); @Override - public int compare(Resource unused, Resource lhs, Resource rhs) { + protected int compare(Resource unused, Resource lhs, Resource rhs, + boolean singleType) { // Only consider memory return Long.compare(lhs.getMemorySize(), rhs.getMemorySize()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 9f1c8d7..bd73fbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -51,17 +51,18 @@ LogFactory.getLog(DominantResourceCalculator.class); @Override - public int compare(Resource clusterResource, Resource lhs, Resource rhs) { + protected int compare(Resource clusterResource, Resource lhs, Resource rhs, + boolean singleType) { if (lhs.equals(rhs)) { return 0; } if (isInvalidDivisor(clusterResource)) { - if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs - .getVirtualCores()) - || (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs - .getVirtualCores())) { + if ((lhs.getMemorySize() < rhs.getMemorySize() && + lhs.getVirtualCores() > rhs.getVirtualCores()) || + (lhs.getMemorySize() > rhs.getMemorySize() && + lhs.getVirtualCores() < rhs.getVirtualCores())) { return 0; } else if (lhs.getMemorySize() > rhs.getMemorySize() || lhs.getVirtualCores() > rhs.getVirtualCores()) { @@ -79,7 +80,7 @@ public int compare(Resource clusterResource, Resource lhs, Resource rhs) { return -1; } else if (l > r) { return 1; - } else { + } else if (!singleType) { l = getResourceAsValue(clusterResource, lhs, false); r = getResourceAsValue(clusterResource, rhs, false); if (l < r) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 50ce04c..16e67e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -28,8 +28,30 @@ @Unstable public abstract class ResourceCalculator { - public abstract int - compare(Resource clusterResource, Resource lhs, Resource rhs); + /** + * On a cluster with capacity {@code clusterResource}, compare {@code lhs} + * and {@code rhs}. Consider all resources unless {@code singleType} is set + * to true. + */ + protected abstract int compare( + Resource clusterResource, Resource lhs, Resource rhs, boolean singleType); + + /** + * On a cluster with capacity {@code clusterResource}, compare {@code lhs} + * and {@code rhs} considering all resources. + */ + public int compare(Resource clusterResource, Resource lhs, Resource rhs) { + return compare(clusterResource, lhs, rhs, false); + } + + /** + * On a cluster with capacity {@code clusterResource}, compare {@code lhs} + * and {@code rhs} considering a single resource type. + */ + public int compareSingleType( + Resource clusterResource, Resource lhs, Resource rhs) { + return compare(clusterResource, lhs, rhs, true); + } public static int divideAndCeil(int a, int b) { if (b == 0) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6ed0660..1fe2998 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -605,8 +604,7 @@ boolean canContainerBePreempted(RMContainer container) { Resource usageAfterPreemption = Resources.subtract( getResourceUsage(), container.getAllocatedResource()); - return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(), - scheduler.getClusterResource(), usageAfterPreemption, getFairShare()); + return !isStarvedForFairShare(usageAfterPreemption); } /** @@ -857,7 +855,9 @@ private Resource assignContainer( } private boolean isReservable(Resource capacity) { - return scheduler.isAtLeastReservationThreshold( + // Reserve only when the app is starved and the requested container size + // is larger than the configured threshold + return isStarved() && scheduler.isAtLeastReservationThreshold( getQueue().getPolicy().getResourceCalculator(), capacity); } @@ -1089,34 +1089,45 @@ boolean assignReservedContainer(FSSchedulerNode node) { * @return freshly computed fairshare starvation */ Resource fairShareStarvation() { - Resource threshold = Resources.multiply( - getFairShare(), fsQueue.getFairSharePreemptionThreshold()); - Resource starvation = Resources.componentwiseMin(threshold, demand); - Resources.subtractFromNonNegative(starvation, getResourceUsage()); - long now = scheduler.getClock().getTime(); - boolean starved = !Resources.isNone(starvation); + boolean starved = isStarvedForFairShare(); if (!starved) { lastTimeAtFairShare = now; } - if (starved && - (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) { - this.fairshareStarvation = starvation; + if (!starved || + now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) { + fairshareStarvation = Resources.none(); } else { - this.fairshareStarvation = Resources.none(); + Resource threshold = Resources.multiply( + getFairShare(), fsQueue.getFairSharePreemptionThreshold()); + Resource starvation = Resources.componentwiseMin(threshold, demand); + Resources.subtractFromNonNegative(starvation, getResourceUsage()); + fairshareStarvation = starvation; } - return this.fairshareStarvation; + return fairshareStarvation; + } + + /** + * Helper method that checks if {@code usage} corresponds to fairshare + * starvation. + */ + private boolean isStarvedForFairShare(Resource usage) { + return 0 > fsQueue.getPolicy().getResourceCalculator().compareSingleType( + scheduler.getClusterResource(), usage, getFairShare()); } /** * Helper method that captures if this app is identified to be starved. * @return true if the app is starved for fairshare, false otherwise */ - @VisibleForTesting boolean isStarvedForFairShare() { - return !Resources.isNone(fairshareStarvation); + return isStarvedForFairShare(getResourceUsage()); + } + + boolean isStarved() { + return isStarvedForFairShare() || !Resources.isNone(minshareStarvation); } /** @@ -1333,6 +1344,11 @@ public boolean equals(Object o) { } @Override + public String toString() { + return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption(); + } + + @Override public boolean isPreemptable() { return getQueue().isPreemptable(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 6f04cb7..369b8a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -155,8 +155,12 @@ public int compare(Schedulable s1, Schedulable s2) { resourceOrder1, resourceOrder2); } if (res == 0) { - // Apps are tied in fairness ratio. Break the tie by submit time. - res = (int)(s1.getStartTime() - s2.getStartTime()); + // Apps are tied in fairness ratio. Break the tie by submit time and job + // name to get a deterministic ordering, which is useful for unit tests. + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } } return res; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 9036a03..f8cdb45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -131,8 +131,9 @@ else if (s1Needy && s2Needy) // Apps are tied in fairness ratio. Break the tie by submit time and job // name to get a deterministic ordering, which is useful for unit tests. res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - if (res == 0) + if (res == 0) { res = s1.getName().compareTo(s2.getName()); + } } return res; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 480a329..b894d2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -72,7 +72,7 @@ {"MinSharePreemptionWithDRF", 1}, {"FairSharePreemption", 2}, {"FairSharePreemptionWithDRF", 3} - }); + }); } public TestFairSchedulerPreemption(String name, int mode) @@ -110,6 +110,7 @@ private void writeAllocFile() throws IOException { * |--- preemptable * |--- child-1 * |--- child-2 + * |--- preemptable-2 * |--- nonpreemptible * |--- child-1 * |--- child-2 @@ -133,6 +134,10 @@ private void writeAllocFile() throws IOException { out.println(""); // end of preemptable queue + out.println(""); + writePreemptionParams(out); + out.println(""); + // Queue with preemption disallowed out.println(""); out.println("false" + @@ -269,10 +274,11 @@ private void submitApps(String queue1, String queue2) preemptHalfResources(queue2); } - private void verifyPreemption() throws InterruptedException { + private void verifyPreemption(int numStarvedAppContainers) + throws InterruptedException { // Sleep long enough for four containers to be preempted. for (int i = 0; i < 1000; i++) { - if (greedyApp.getLiveContainers().size() == 4) { + if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) { break; } Thread.sleep(10); @@ -280,13 +286,13 @@ private void verifyPreemption() throws InterruptedException { // Verify the right amount of containers are preempted from greedyApp assertEquals("Incorrect number of containers on the greedy app", - 4, greedyApp.getLiveContainers().size()); + 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size()); sendEnoughNodeUpdatesToAssignFully(); // Verify the preempted containers are assigned to starvingApp assertEquals("Starved app is not assigned the right number of containers", - 2, starvingApp.getLiveContainers().size()); + numStarvedAppContainers, starvingApp.getLiveContainers().size()); } private void verifyNoPreemption() throws InterruptedException { @@ -305,7 +311,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { - verifyPreemption(); + verifyPreemption(2); } else { verifyNoPreemption(); } @@ -314,13 +320,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2"); - verifyPreemption(); + verifyPreemption(2); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); - verifyPreemption(); + verifyPreemption(2); } @Test @@ -354,7 +360,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception { setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(); + verifyPreemption(2); ArrayList containers = (ArrayList) starvingApp.getLiveContainers(); @@ -365,4 +371,24 @@ public void testPreemptionSelectNonAMContainer() throws Exception { assertTrue("Preempted containers should come from two different " + "nodes.", !host0.equals(host1)); } + + @Test + public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() + throws InterruptedException { + // Run this test only for fairshare preemption + if (!fairsharePreemption) { + return; + } + + // Let one of the child queues take over the entire cluster + takeAllResources("root.preemptable.child-1"); + + // Submit a job so half the resources go to parent's sibling + preemptHalfResources("root.preemptable-2"); + verifyPreemption(2); + + // Submit a job to the child's sibling to force preemption from the child + preemptHalfResources("root.preemptable.child-2"); + verifyPreemption(1); + } }