diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 42c45ad..192dbcf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -30,7 +30,8 @@
LogFactory.getLog(DefaultResourceCalculator.class);
@Override
- public int compare(Resource unused, Resource lhs, Resource rhs) {
+ protected int compare(Resource unused, Resource lhs, Resource rhs,
+ boolean singleType) {
// Only consider memory
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 9f1c8d7..bd73fbf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -51,17 +51,18 @@
LogFactory.getLog(DominantResourceCalculator.class);
@Override
- public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
+ protected int compare(Resource clusterResource, Resource lhs, Resource rhs,
+ boolean singleType) {
if (lhs.equals(rhs)) {
return 0;
}
if (isInvalidDivisor(clusterResource)) {
- if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
- .getVirtualCores())
- || (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
- .getVirtualCores())) {
+ if ((lhs.getMemorySize() < rhs.getMemorySize() &&
+ lhs.getVirtualCores() > rhs.getVirtualCores()) ||
+ (lhs.getMemorySize() > rhs.getMemorySize() &&
+ lhs.getVirtualCores() < rhs.getVirtualCores())) {
return 0;
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
@@ -79,7 +80,7 @@ public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
return -1;
} else if (l > r) {
return 1;
- } else {
+ } else if (!singleType) {
l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false);
if (l < r) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 50ce04c..16e67e1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -28,8 +28,30 @@
@Unstable
public abstract class ResourceCalculator {
- public abstract int
- compare(Resource clusterResource, Resource lhs, Resource rhs);
+ /**
+ * On a cluster with capacity {@code clusterResource}, compare {@code lhs}
+ * and {@code rhs}. Consider all resources unless {@code singleType} is set
+ * to true.
+ */
+ protected abstract int compare(
+ Resource clusterResource, Resource lhs, Resource rhs, boolean singleType);
+
+ /**
+ * On a cluster with capacity {@code clusterResource}, compare {@code lhs}
+ * and {@code rhs} considering all resources.
+ */
+ public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
+ return compare(clusterResource, lhs, rhs, false);
+ }
+
+ /**
+ * On a cluster with capacity {@code clusterResource}, compare {@code lhs}
+ * and {@code rhs} considering a single resource type.
+ */
+ public int compareSingleType(
+ Resource clusterResource, Resource lhs, Resource rhs) {
+ return compare(clusterResource, lhs, rhs, true);
+ }
public static int divideAndCeil(int a, int b) {
if (b == 0) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 6ed0660..1fe2998 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -605,8 +604,7 @@ boolean canContainerBePreempted(RMContainer container) {
Resource usageAfterPreemption = Resources.subtract(
getResourceUsage(), container.getAllocatedResource());
- return !Resources.lessThan(fsQueue.getPolicy().getResourceCalculator(),
- scheduler.getClusterResource(), usageAfterPreemption, getFairShare());
+ return !isStarvedForFairShare(usageAfterPreemption);
}
/**
@@ -857,7 +855,9 @@ private Resource assignContainer(
}
private boolean isReservable(Resource capacity) {
- return scheduler.isAtLeastReservationThreshold(
+ // Reserve only when the app is starved and the requested container size
+ // is larger than the configured threshold
+ return isStarved() && scheduler.isAtLeastReservationThreshold(
getQueue().getPolicy().getResourceCalculator(), capacity);
}
@@ -1089,34 +1089,45 @@ boolean assignReservedContainer(FSSchedulerNode node) {
* @return freshly computed fairshare starvation
*/
Resource fairShareStarvation() {
- Resource threshold = Resources.multiply(
- getFairShare(), fsQueue.getFairSharePreemptionThreshold());
- Resource starvation = Resources.componentwiseMin(threshold, demand);
- Resources.subtractFromNonNegative(starvation, getResourceUsage());
-
long now = scheduler.getClock().getTime();
- boolean starved = !Resources.isNone(starvation);
+ boolean starved = isStarvedForFairShare();
if (!starved) {
lastTimeAtFairShare = now;
}
- if (starved &&
- (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
- this.fairshareStarvation = starvation;
+ if (!starved ||
+ now - lastTimeAtFairShare < fsQueue.getFairSharePreemptionTimeout()) {
+ fairshareStarvation = Resources.none();
} else {
- this.fairshareStarvation = Resources.none();
+ Resource threshold = Resources.multiply(
+ getFairShare(), fsQueue.getFairSharePreemptionThreshold());
+ Resource starvation = Resources.componentwiseMin(threshold, demand);
+ Resources.subtractFromNonNegative(starvation, getResourceUsage());
+ fairshareStarvation = starvation;
}
- return this.fairshareStarvation;
+ return fairshareStarvation;
+ }
+
+ /**
+ * Helper method that checks if {@code usage} corresponds to fairshare
+ * starvation.
+ */
+ private boolean isStarvedForFairShare(Resource usage) {
+ return 0 > fsQueue.getPolicy().getResourceCalculator().compareSingleType(
+ scheduler.getClusterResource(), usage, getFairShare());
}
/**
* Helper method that captures if this app is identified to be starved.
* @return true if the app is starved for fairshare, false otherwise
*/
- @VisibleForTesting
boolean isStarvedForFairShare() {
- return !Resources.isNone(fairshareStarvation);
+ return isStarvedForFairShare(getResourceUsage());
+ }
+
+ boolean isStarved() {
+ return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
}
/**
@@ -1333,6 +1344,11 @@ public boolean equals(Object o) {
}
@Override
+ public String toString() {
+ return getApplicationAttemptId() + " Alloc: " + getCurrentConsumption();
+ }
+
+ @Override
public boolean isPreemptable() {
return getQueue().isPreemptable();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 6f04cb7..369b8a1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -155,8 +155,12 @@ public int compare(Schedulable s1, Schedulable s2) {
resourceOrder1, resourceOrder2);
}
if (res == 0) {
- // Apps are tied in fairness ratio. Break the tie by submit time.
- res = (int)(s1.getStartTime() - s2.getStartTime());
+ // Apps are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0) {
+ res = s1.getName().compareTo(s2.getName());
+ }
}
return res;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 9036a03..f8cdb45 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -131,8 +131,9 @@ else if (s1Needy && s2Needy)
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
- if (res == 0)
+ if (res == 0) {
res = s1.getName().compareTo(s2.getName());
+ }
}
return res;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 480a329..b894d2d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -72,7 +72,7 @@
{"MinSharePreemptionWithDRF", 1},
{"FairSharePreemption", 2},
{"FairSharePreemptionWithDRF", 3}
- });
+ });
}
public TestFairSchedulerPreemption(String name, int mode)
@@ -110,6 +110,7 @@ private void writeAllocFile() throws IOException {
* |--- preemptable
* |--- child-1
* |--- child-2
+ * |--- preemptable-2
* |--- nonpreemptible
* |--- child-1
* |--- child-2
@@ -133,6 +134,10 @@ private void writeAllocFile() throws IOException {
out.println(""); // end of preemptable queue
+ out.println("");
+ writePreemptionParams(out);
+ out.println("");
+
// Queue with preemption disallowed
out.println("");
out.println("false" +
@@ -269,10 +274,11 @@ private void submitApps(String queue1, String queue2)
preemptHalfResources(queue2);
}
- private void verifyPreemption() throws InterruptedException {
+ private void verifyPreemption(int numStarvedAppContainers)
+ throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
- if (greedyApp.getLiveContainers().size() == 4) {
+ if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
break;
}
Thread.sleep(10);
@@ -280,13 +286,13 @@ private void verifyPreemption() throws InterruptedException {
// Verify the right amount of containers are preempted from greedyApp
assertEquals("Incorrect number of containers on the greedy app",
- 4, greedyApp.getLiveContainers().size());
+ 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp
assertEquals("Starved app is not assigned the right number of containers",
- 2, starvingApp.getLiveContainers().size());
+ numStarvedAppContainers, starvingApp.getLiveContainers().size());
}
private void verifyNoPreemption() throws InterruptedException {
@@ -305,7 +311,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
- verifyPreemption();
+ verifyPreemption(2);
} else {
verifyNoPreemption();
}
@@ -314,13 +320,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
- verifyPreemption();
+ verifyPreemption(2);
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
- verifyPreemption();
+ verifyPreemption(2);
}
@Test
@@ -354,7 +360,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
- verifyPreemption();
+ verifyPreemption(2);
ArrayList containers =
(ArrayList) starvingApp.getLiveContainers();
@@ -365,4 +371,24 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
assertTrue("Preempted containers should come from two different "
+ "nodes.", !host0.equals(host1));
}
+
+ @Test
+ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
+ throws InterruptedException {
+ // Run this test only for fairshare preemption
+ if (!fairsharePreemption) {
+ return;
+ }
+
+ // Let one of the child queues take over the entire cluster
+ takeAllResources("root.preemptable.child-1");
+
+ // Submit a job so half the resources go to parent's sibling
+ preemptHalfResources("root.preemptable-2");
+ verifyPreemption(2);
+
+ // Submit a job to the child's sibling to force preemption from the child
+ preemptHalfResources("root.preemptable.child-2");
+ verifyPreemption(1);
+ }
}