diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 94991ebb14c..236884240ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -588,7 +588,8 @@ private void untrackContainerForPreemption(RMContainer container) { } } - boolean canContainerBePreempted(RMContainer container) { + boolean canContainerBePreempted(RMContainer container, + Resource alreadyConsideringForPreemption) { if (!isPreemptable()) { return false; } @@ -617,9 +618,14 @@ boolean canContainerBePreempted(RMContainer container) { Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted); } - // Subtract this container's allocation to compute usage after preemption + // Account for this container and other containers that the + // FSPreemptionThread is already considering for preemption. + Resource totalThatWouldBePreempted = + Resources.add(alreadyConsideringForPreemption, + container.getAllocatedResource()); + Resources.subtractFrom( - usageAfterPreemption, container.getAllocatedResource()); + usageAfterPreemption, totalThatWouldBePreempted); return !isUsageBelowShare(usageAfterPreemption, getFairShare()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c53dae..3b12ea92515 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -29,7 +30,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; @@ -134,6 +137,14 @@ public void run() { containersToPreempt.addAll(bestContainers.containers); // Reserve the containers for the starved app trackPreemptionsAgainstNode(bestContainers.containers, starvedApp); + // Warn application about containers to be killed + for (RMContainer container : bestContainers.containers) { + FSAppAttempt app = scheduler.getSchedulerApp( + container.getApplicationAttemptId()); + LOG.info("Preempting container " + container + + " from queue " + app.getQueueName()); + app.trackContainerForPreemption(container); + } } } } // End of iteration over RRs @@ -167,17 +178,29 @@ private PreemptableContainers identifyContainersToPreemptOnNode( Resources.clone(node.getUnallocatedResource()), node.getTotalReserved()); + Map consideringForPreemption = new HashMap<>(); + for (RMContainer container : containersToCheck) { FSAppAttempt app = scheduler.getSchedulerApp(container.getApplicationAttemptId()); + ApplicationId appId = app.getApplicationId(); + + consideringForPreemption.putIfAbsent(appId, Resources.none()); - if (app.canContainerBePreempted(container)) { + if (app.canContainerBePreempted(container, + consideringForPreemption.get(appId))) { // Flag container for preemption if (!preemptableContainers.addContainer(container)) { return null; } - Resources.addTo(potential, container.getAllocatedResource()); + Resource containerResources = container.getAllocatedResource(); + Resources.addTo(potential, containerResources); + Resource planningToPreemptBeforeThisContainer = + consideringForPreemption.get(appId); + consideringForPreemption.put(appId, + Resources.add(planningToPreemptBeforeThisContainer, + containerResources)); } // Check if we have already identified enough containers @@ -199,15 +222,6 @@ private void trackPreemptionsAgainstNode(List containers, } private void preemptContainers(List containers) { - // Warn application about containers to be killed - for (RMContainer container : containers) { - ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - LOG.info("Preempting container " + container + - " from queue " + app.getQueueName()); - app.trackContainerForPreemption(container); - } - // Schedule timer task to kill containers preemptionTimer.schedule( new PreemptContainersTask(containers), warnTimeBeforeKill); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 31630240e28..ac5d9fe7afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -278,11 +278,12 @@ private void submitApps(String queue1, String queue2) preemptHalfResources(queue2); } - private void verifyPreemption(int numStarvedAppContainers) + private void verifyPreemption(int numStarvedAppContainers, + int numGreedyAppContainers) throws InterruptedException { // Sleep long enough for four containers to be preempted. for (int i = 0; i < 1000; i++) { - if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) { + if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) { break; } Thread.sleep(10); @@ -290,12 +291,12 @@ private void verifyPreemption(int numStarvedAppContainers) // Post preemption, verify the greedyApp has the correct # of containers. assertEquals("Incorrect # of containers on the greedy app", - 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size()); + numGreedyAppContainers, greedyApp.getLiveContainers().size()); // Verify the queue metrics are set appropriately. The greedyApp started // with 8 1GB, 1vcore containers. assertEquals("Incorrect # of preempted containers in QueueMetrics", - 8 - 2 * numStarvedAppContainers, + 8 - numGreedyAppContainers, greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); // Verify the node is reserved for the starvingApp @@ -340,7 +341,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { - verifyPreemption(2); + verifyPreemption(2, 4); } else { verifyNoPreemption(); } @@ -349,13 +350,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2"); - verifyPreemption(2); + verifyPreemption(2, 4); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); - verifyPreemption(2); + verifyPreemption(2, 4); } @Test @@ -389,7 +390,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception { setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(2); + verifyPreemption(2, 4); ArrayList containers = (ArrayList) starvingApp.getLiveContainers(); @@ -402,6 +403,22 @@ public void testPreemptionSelectNonAMContainer() throws Exception { } @Test + public void testAppNotPreemptedBelowFairShare() throws Exception { + takeAllResources("root.preemptable.child-1"); + tryPreemptMoreThanFairShare("root.preemptable.child-2"); + } + + private void tryPreemptMoreThanFairShare(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(3 * GB, 3, queueName, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + verifyPreemption(1, 5); + } + + @Test public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException { // Run this test only for fairshare preemption @@ -414,10 +431,10 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() // Submit a job so half the resources go to parent's sibling preemptHalfResources("root.preemptable-sibling"); - verifyPreemption(2); + verifyPreemption(2, 4); // Submit a job to the child's sibling to force preemption from the child preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(1); + verifyPreemption(1, 2); } }