diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 30245586a0d..45002a54620 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -585,7 +585,7 @@ private void untrackContainerForPreemption(RMContainer container) { } } - boolean canContainerBePreempted(RMContainer container) { + boolean canContainerBePreempted(RMContainer container, Resource alreadyConsideringForPreemption) { if (!isPreemptable()) { return false; } @@ -615,8 +615,9 @@ boolean canContainerBePreempted(RMContainer container) { } // Subtract this container's allocation to compute usage after preemption + Resource totalThatWouldBePreempted = Resources.add(alreadyConsideringForPreemption, container.getAllocatedResource()); Resources.subtractFrom( - usageAfterPreemption, container.getAllocatedResource()); + usageAfterPreemption, totalThatWouldBePreempted); return !isUsageBelowShare(usageAfterPreemption, getFairShare()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c53dae..1709567d863 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -29,7 +30,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; @@ -134,6 +137,14 @@ public void run() { containersToPreempt.addAll(bestContainers.containers); // Reserve the containers for the starved app trackPreemptionsAgainstNode(bestContainers.containers, starvedApp); + // Warn application about containers to be killed + for (RMContainer container : bestContainers.containers) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + LOG.info("Preempting container " + container + + " from queue " + app.getQueueName()); + app.trackContainerForPreemption(container); + } } } } // End of iteration over RRs @@ -167,17 +178,27 @@ private PreemptableContainers identifyContainersToPreemptOnNode( Resources.clone(node.getUnallocatedResource()), node.getTotalReserved()); + Map consideringForPreemption = new HashMap<>(); + for (RMContainer container : containersToCheck) { FSAppAttempt app = scheduler.getSchedulerApp(container.getApplicationAttemptId()); + ApplicationId appId = app.getApplicationId(); + + if (consideringForPreemption.get(appId) == null) { + consideringForPreemption.put(appId, Resources.none()); + } - if (app.canContainerBePreempted(container)) { + if (app.canContainerBePreempted(container, consideringForPreemption.get(appId))) { // Flag container for preemption if (!preemptableContainers.addContainer(container)) { return null; } - Resources.addTo(potential, container.getAllocatedResource()); + Resource containerResources = container.getAllocatedResource(); + Resources.addTo(potential, containerResources); + Resource planningToPreemptBeforeThisContainer = consideringForPreemption.get(appId); + consideringForPreemption.put(appId, Resources.add(planningToPreemptBeforeThisContainer, containerResources)); } // Check if we have already identified enough containers @@ -199,15 +220,6 @@ private void trackPreemptionsAgainstNode(List containers, } private void preemptContainers(List containers) { - // Warn application about containers to be killed - for (RMContainer container : containers) { - ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - LOG.info("Preempting container " + container + - " from queue " + app.getQueueName()); - app.trackContainerForPreemption(container); - } - // Schedule timer task to kill containers preemptionTimer.schedule( new PreemptContainersTask(containers), warnTimeBeforeKill); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 31630240e28..b381ffa7497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -402,6 +402,34 @@ public void testPreemptionSelectNonAMContainer() throws Exception { } @Test + public void testAppNotPreemptedBelowFairShare() throws Exception { + takeAllResources("root.preemptable.child-1"); + setNumAMContainersPerNode(1); + tryPreemptMoreThanFairShare("root.preemptable.child-2"); + } + + private void tryPreemptMoreThanFairShare(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(3 * GB, 3, queueName, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + Thread.sleep(10000); + + // The total cluster memory is 8g, and the fair share of each queue is 4g. + // If we preempted more than 1 3g container, then we preempted greedyApp + // to below its fair share, which shouldn't be possible. + assertEquals(greedyApp.getLiveContainers().size(), 5); + + sendEnoughNodeUpdatesToAssignFully(); + assertEquals(starvingApp.getLiveContainers().size(), 1); + } + + @Test public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException { // Run this test only for fairshare preemption