From 228366684d267653958cf8007c080680fe9b6a9f Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 13 Jul 2017 16:48:29 +0530 Subject: [PATCH] YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. Contributed by Wangda Tan. (cherry picked from commit cf0d0844d6ae25d537391edb9b65fca05d1848e6) Addendum patch for YARN-5731 (cherry picked from commit 0b7afc060c2024a882bd1934d0f722bfca731742) --- .../monitor/capacity/FifoCandidatesSelector.java | 6 +- .../ProportionalCapacityPreemptionPolicy.java | 7 +- .../capacity/CapacitySchedulerConfiguration.java | 27 + .../CapacitySchedulerPreemptionTestBase.java | 7 +- .../TestCapacitySchedulerSurgicalPreemption.java | 664 +++++++++++++++++++++ 5 files changed, 705 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 33e4afc..6e0d7d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -44,12 +44,12 @@ LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; - FifoCandidatesSelector( - CapacitySchedulerPreemptionContext preemptionContext) { + FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, + boolean includeReservedResource) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, false); + preemptionContext, includeReservedResource); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 8f59078..ee6a27d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -222,8 +222,13 @@ public void init(Configuration config, RMContext context, .add(new ReservedContainerCandidatesSelector(this)); } + boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean( + CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, + CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS); + // initialize candidates preemption selection policies - candidatesSelectionPolicies.add(new FifoCandidatesSelector(this)); + candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, + additionalPreemptionBasedOnReservedResource)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index f2f9001..642a05b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1094,6 +1094,33 @@ public boolean getLazyPreemptionEnabled() { 0.2f; /** + * By default, reserved resource will be excluded while balancing capacities + * of queues. + * + * Why doing this? In YARN-4390, we added preemption-based-on-reserved-container + * Support. To reduce unnecessary preemption for large containers. We will + * not include reserved resources while calculating ideal-allocation in + * FifoCandidatesSelector. + * + * Changes in YARN-4390 will significantly reduce number of containers preempted + * When cluster has heterogeneous container requests. (Please check test + * report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf + * + * However, on the other hand, in some corner cases, especially for + * fragmented cluster. It could lead to preemption cannot kick in in some + * cases. Please see YARN-5731. + * + * So to solve the problem, make this change to be configurable, and please + * note that it is an experimental option. + */ + public static final String + ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = + PREEMPTION_CONFIG_PREFIX + + "additional_res_balance_based_on_reserved_containers"; + public static final boolean + DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false; + + /** * When calculating which containers to be preempted, we will try to preempt * containers for reserved containers first. By default is false. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f615..d2bf7f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -129,9 +129,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app, public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, ApplicationAttemptId appId, int expected) throws InterruptedException { int waitNum = 0; + int total = 0; while (waitNum < 500) { - int total = 0; + total = 0; for (RMContainer c : node.getCopiedListOfRunningContainers()) { if (c.getApplicationAttemptId().equals(appId)) { total++; @@ -144,6 +145,8 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, waitNum++; } - Assert.fail(); + Assert.fail( + "Check #live-container-on-node-from-app, actual=" + total + " expected=" + + expected); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 5067412..64e3281 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -38,6 +38,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Set; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -243,4 +245,666 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-b first, it asked 6 * 1G containers + * We will allocate 4 on n1 (including AM) and 3 on n2. + * + * 3) app2 submit to queue-c, ask for one 18G container (for AM) + * + * After preemption, we should expect: + * Preempt 3 containers from app1 and AM of app2 successfully allocated. + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<>()); + + // Do allocation for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, so the abs-used-cap of b is + // 7 / 40 = 17.5% < 20% (guaranteed) + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + // 4 from n1 and 3 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 4); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 3); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + // Call editSchedule immediately: containers are not selected + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Sleep the timeout interval, we should be able to see containers selected + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed, and new AM + // container launched + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + + rm1.close(); + } + + @Test(timeout = 300000) + public void testPriorityPreemptionRequiresMoveReservation() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) 3 nodes in the cluster, 10G for each + * + * 2) app1 submit to queue-b first, it asked 2G each, + * it can get 2G on n1 (AM), 2 * 2G on n2 + * + * 3) app2 submit to queue-c, with 2G AM container (allocated on n3) + * app2 requires 9G resource, which will be reserved on n3 + * + * We should expect container unreserved from n3 and allocated on n1/n2 + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 2 * GB, 2, new ArrayList<>()); + + // Do allocation for node2 twice + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + + // 1 from n1 and 2 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 1); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 2); + + // Submit app2 to queue-c and asks for a 2G container for AM, on n3 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Asks 1 * 9G container + am2.allocate("*", 9 * GB, 1, new ArrayList<>()); + + // Do allocation for node3 once + cs.handle(new NodeUpdateSchedulerEvent(rmNode3)); + + // Make sure container reserved on node3 + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Call editSchedule immediately: nothing happens + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Sleep the timeout interval, we should be able to see reserved container + // moved to n2 (n1 occupied by AM) + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + Assert.assertNotNull( + cs.getNode(rmNode2.getNodeID()).getReservedContainer()); + Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode( + rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId()); + + // Do it again, we should see containers marked to be preempt + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (schedulerApp2.getLiveContainers().size() < 2) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + Thread.sleep(200); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + rm1.close(); + } + + @Test(timeout = 60000) + public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G. + * + * 2) app1 submit to queue-b first, it asked 8 * 1G containers + * We will allocate 1 container on each of n0-n10 + * + * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM) + * + * After preemption, we should expect: + * Preempt 7 containers from app1 and usage of app2 is 70% + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[10]; + for (int i = 0; i < 10; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[10]; + for (int i = 0; i < 10; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]); + + am1.allocate("*", 1 * GB, 8, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 1; i < 9; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 9 containers now, so the abs-used-cap of b is 9% + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(9, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 9; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-c and asks for a 10G container for AM + // Launch AM in NM9 + RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 10 * 10GB containers + am2.allocate("*", 10 * GB, 10, new ArrayList<>()); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm1-nm9 + for (int i = 1; i < 9; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + } + + // Sleep the timeout interval, we should be able to see 6 containers selected + // 6 (selected) + 1 (allocated) which makes target capacity to 70% + Thread.sleep(1000); + + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, + am1.getApplicationAttemptId()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 7); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + rm1.close(); + } + + @Test(timeout = 600000) + public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          45  45  10
+     * 
+ * + * Priority of queue_a = 1 + * Priority of queue_b = 2 + * + * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers + * We will allocate 1 container on each of n0-n4. AM on n4. + * + * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0 + * Ask for 2 * 3.5G containers. (Reserved on n0/n1) + * + * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2 + * Ask for 2 * 3.5G containers. (Reserved on n2/n3) + * + * First we will preempt container on n2 since it is the oldest container of + * Highest priority queue (b) + */ + + // Total preemption = 1G per round, which is 5% of cluster resource (20G) + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 0.05f); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // A/B has higher priority + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[5]; + for (int i = 0; i < 5; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[5]; + for (int i = 0; i < 5; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); + + am1.allocate("*", 1 * GB, 4, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 5 containers now, one for each node + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 5; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0) + RMApp app2 = rm1.submitApp(512, "app", "user", null, "a"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n0-n1 + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm0-nm1 + for (int i = 0; i < 2; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "a"); + } + + // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2) + RMApp app3 = rm1.submitApp(512, "app", "user", null, "b"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); + FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n2-n3 + for (int i = 2; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm2-nm3 + for (int i = 2; i < 4; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "b"); + } + + // Sleep the timeout interval, we should be able to see 1 container selected + Thread.sleep(1000); + + /* 1st container preempted is on n2 */ + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[2] + Set selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[2].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); + + // Make sure the container killed, then do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 4); + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + waitNumberOfLiveContainersFromApp(schedulerApp3, 2); + + /* 2nd container preempted is on n3 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[3] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[3].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + /* 3rd container preempted is on n0 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[0] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[0].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 2); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 2); + waitNumberOfLiveContainersFromApp(schedulerApp2, 2); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + /* 4th container preempted is on n1 */ + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[0] + selectedToPreempt = + editPolicy.getToPreemptContainers().keySet(); + Assert.assertEquals(1, selectedToPreempt.size()); + Assert.assertEquals(mockNMs[1].getNodeId(), + selectedToPreempt.iterator().next().getAllocatedNode()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + // Do allocation for all nms + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + waitNumberOfLiveContainersFromApp(schedulerApp2, 3); + waitNumberOfLiveContainersFromApp(schedulerApp3, 3); + + rm1.close(); + } + + @Test(timeout = 60000) + public void testPreemptionForFragmentatedCluster() throws Exception { + // Set additional_balance_queue_based_on_reserved_res to true to get + // additional preemptions. + conf.setBoolean( + CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, + true); + + /** + * Two queues, a/b, each of them are 50/50 + * 5 nodes in the cluster, each of them is 30G. + * + * Submit first app, AM = 3G, and 4 * 21G containers. + * Submit second app, AM = 3G, and 4 * 21G containers, + * + * We can get one container preempted from 1st app. + */ + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 1024 * 21); + conf.setQueues("root", new String[] { "a", "b" }); + conf.setCapacity("root.a", 50); + conf.setUserLimitFactor("root.a", 100); + conf.setCapacity("root.b", 50); + conf.setUserLimitFactor("root.b", 100); + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0)); + + am1.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App1 should have 5 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2)); + + am2.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App2 should have 2 containers now + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + // Call editSchedule twice and allocation once, container should get allocated + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) { + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + tick++; + Thread.sleep(100); + } + Assert.assertEquals(3, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + } -- 2.10.1 (Apple Git-78)