diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 33e4afc373d..6e0d7d7b3c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -44,12 +44,12 @@ LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; - FifoCandidatesSelector( - CapacitySchedulerPreemptionContext preemptionContext) { + FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, + boolean includeReservedResource) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, false); + preemptionContext, includeReservedResource); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 8f5907882e3..a99bc5ca9f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -223,7 +223,27 @@ public void init(Configuration config, RMContext context, } // initialize candidates preemption selection policies - candidatesSelectionPolicies.add(new FifoCandidatesSelector(this)); + // When select candidates for reserved containers is enabled, exclude reserved + // resource in fifo policy (less aggressive). Otherwise include reserved + // resource. + // + // Why doing this? In YARN-4390, we added preemption-based-on-reserved-container + // Support. To reduce unnecessary preemption for large containers. We will + // not include reserved resources while calculating ideal-allocation in + // FifoCandidatesSelector. + // + // Changes in YARN-4390 will significantly reduce number of containers preempted + // When cluster has heterogeneous container requests. (Please check test + // report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf + // + // However, on the other hand, in some corner cases, especially for + // fragmented cluster. It could lead to preemption cannot kick in in some + // cases. Please see YARN-5731. + // + // So to solve the problem, we will include reserved when surgical preemption + // for reserved container, which reverts behavior when YARN-4390 is disabled. + candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, + !selectCandidatesForResevedContainers)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 13167b666d9..dd704efe907 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -989,7 +989,7 @@ public void testHierarchicalLarge3LevelsWithReserved() { ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC))); assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemorySize()); //2nd level child(E) preempts 10, but parent A has only 9 extra //check the parent can prempt only the extra from > 2 level child diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f6155b9f..d2bf7f24ca1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -129,9 +129,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app, public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, ApplicationAttemptId appId, int expected) throws InterruptedException { int waitNum = 0; + int total = 0; while (waitNum < 500) { - int total = 0; + total = 0; for (RMContainer c : node.getCopiedListOfRunningContainers()) { if (c.getApplicationAttemptId().equals(appId)) { total++; @@ -144,6 +145,8 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, waitNum++; } - Assert.fail(); + Assert.fail( + "Check #live-container-on-node-from-app, actual=" + total + " expected=" + + expected); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 50674125565..047f17d84df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -243,4 +245,100 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testPreemptionForFragmentatedCluster() + throws Exception { + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + false); + + /** + * Two queues, a/b, each of them are 50/50 + * 5 nodes in the cluster, each of them is 30G. + * + * Submit first app, AM = 3G, and 4 * 21G containers. + * Submit second app, AM = 3G, and 4 * 21G containers, + * + * We can get one container preempted from 1st app. + */ + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 1024 * 21); + conf.setQueues("root", new String[] { "a", "b" }); + conf.setCapacity("root.a", 50); + conf.setUserLimitFactor("root.a", 100); + conf.setCapacity("root.b", 50); + conf.setUserLimitFactor("root.b", 100); + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0)); + + am1.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App1 should have 5 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2)); + + am2.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App2 should have 2 containers now + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + + // Call editSchedule twice and allocation once, container should get allocated + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) { + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + tick++; + Thread.sleep(100); + } + Assert.assertEquals(3, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } }