diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 33e4afc373d..6e0d7d7b3c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -44,12 +44,12 @@ LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; - FifoCandidatesSelector( - CapacitySchedulerPreemptionContext preemptionContext) { + FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, + boolean includeReservedResource) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, false); + preemptionContext, includeReservedResource); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 8f5907882e3..ee6a27d03b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -222,8 +222,13 @@ public void init(Configuration config, RMContext context, .add(new ReservedContainerCandidatesSelector(this)); } + boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean( + CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, + CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS); + // initialize candidates preemption selection policies - candidatesSelectionPolicies.add(new FifoCandidatesSelector(this)); + candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, + additionalPreemptionBasedOnReservedResource)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 38b538c1942..dfc1d24b5bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1103,6 +1103,33 @@ public boolean getLazyPreemptionEnabled() { 0.2f; /** + * By default, reserved resource will be excluded while balancing capacities + * of queues. + * + * Why doing this? In YARN-4390, we added preemption-based-on-reserved-container + * Support. To reduce unnecessary preemption for large containers. We will + * not include reserved resources while calculating ideal-allocation in + * FifoCandidatesSelector. + * + * Changes in YARN-4390 will significantly reduce number of containers preempted + * When cluster has heterogeneous container requests. (Please check test + * report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf + * + * However, on the other hand, in some corner cases, especially for + * fragmented cluster. It could lead to preemption cannot kick in in some + * cases. Please see YARN-5731. + * + * So to solve the problem, make this change to be configurable, and please + * note that it is an experimental option. + */ + public static final String + ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = + PREEMPTION_CONFIG_PREFIX + + "additional_res_balance_based_on_reserved_containers"; + public static final boolean + DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false; + + /** * When calculating which containers to be preempted, we will try to preempt * containers for reserved containers first. By default is false. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f6155b9f..d2bf7f24ca1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -129,9 +129,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app, public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, ApplicationAttemptId appId, int expected) throws InterruptedException { int waitNum = 0; + int total = 0; while (waitNum < 500) { - int total = 0; + total = 0; for (RMContainer c : node.getCopiedListOfRunningContainers()) { if (c.getApplicationAttemptId().equals(appId)) { total++; @@ -144,6 +145,8 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, waitNum++; } - Assert.fail(); + Assert.fail( + "Check #live-container-on-node-from-app, actual=" + total + " expected=" + + expected); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 50674125565..908e0e88f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -243,4 +245,100 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testPreemptionForFragmentatedCluster() throws Exception { + // Set additional_balance_queue_based_on_reserved_res to true to get + // additional preemptions. + conf.setBoolean( + CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, + true); + + /** + * Two queues, a/b, each of them are 50/50 + * 5 nodes in the cluster, each of them is 30G. + * + * Submit first app, AM = 3G, and 4 * 21G containers. + * Submit second app, AM = 3G, and 4 * 21G containers, + * + * We can get one container preempted from 1st app. + */ + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 1024 * 21); + conf.setQueues("root", new String[] { "a", "b" }); + conf.setCapacity("root.a", 50); + conf.setUserLimitFactor("root.a", 100); + conf.setCapacity("root.b", 50); + conf.setUserLimitFactor("root.b", 100); + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0)); + + am1.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App1 should have 5 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2)); + + am2.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App2 should have 2 containers now + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + // Call editSchedule twice and allocation once, container should get allocated + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) { + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + tick++; + Thread.sleep(100); + } + Assert.assertEquals(3, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } }