diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index d6282a17d79..151b88cf0d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -100,6 +100,7 @@ public static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; public static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; public static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + public static final String E = CapacitySchedulerConfiguration.ROOT + ".e"; public static final String A1 = A + ".a1"; public static final String A2 = A + ".a2"; public static final String B1 = B + ".b1"; @@ -129,8 +130,8 @@ public static final String USER = "user_"; public static final String USER0 = USER + 0; public static final String USER1 = USER + 1; - public static final String USER3 = USER + 3; public static final String USER2 = USER + 2; + public static final String USER3 = USER + 3; public static final String PARENT_QUEUE = "c"; public static final Set accessibleNodeLabelsOnC = new HashSet<>(); @@ -183,7 +184,7 @@ public void setUp() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0,1,2,3}); dispatcher = new SpyDispatcher(); rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); @@ -225,27 +226,33 @@ protected void setupNodes(MockRM newMockRM) throws Exception { } public static CapacitySchedulerConfiguration setupQueueMappings( - CapacitySchedulerConfiguration conf) { + CapacitySchedulerConfiguration conf, String parentQueue, boolean + overrideWithQueueMappings, int[] userIds) { List queuePlacementRules = new ArrayList<>(); queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); conf.setQueuePlacementRules(queuePlacementRules); + List existingMappings = conf + .getQueueMappings(); + //set queue mapping List queueMappings = new ArrayList<>(); - for (int i = 0; i <= 3; i++) { + for (int i = 0; i < userIds.length; i++) { //Set C as parent queue name for auto queue creation UserGroupMappingPlacementRule.QueueMapping userQueueMapping = new UserGroupMappingPlacementRule.QueueMapping( UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, - USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + USER + userIds[i], getQueueMapping(parentQueue, USER + + userIds[i])); queueMappings.add(userQueueMapping); } - conf.setQueueMappings(queueMappings); + existingMappings.addAll(queueMappings); + conf.setQueueMappings(existingMappings); //override with queue mappings - conf.setOverrideWithQueueMappings(true); + conf.setOverrideWithQueueMappings(overrideWithQueueMappings); return conf; } @@ -327,6 +334,29 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( return conf; } + public static CapacitySchedulerConfiguration + setupQueueConfigurationForSingleAutoCreatedLeafQueue( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "c" }); + conf.setCapacity(C, 100f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + return conf; + } + @After public void tearDown() throws Exception { if (mockRM != null) { @@ -395,7 +425,7 @@ protected MockRM setupSchedulerInstance() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0,1,2,3}); RMNodeLabelsManager mgr = setupNodeLabelManager(conf); MockRM newMockRM = new MockRM(conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java new file mode 100644 index 00000000000..202a2d5d025 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java @@ -0,0 +1,206 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity + .ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.C; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.D; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.E; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER0; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER3; + +public class TestCapacitySchedulerAutoCreatedQueuePreemption + extends TestCapacitySchedulerSurgicalPreemption { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + public static CapacitySchedulerConfiguration + setupQueueConfigurationForSimpleSurgicalPreemption( + CapacitySchedulerConfiguration conf) { + + + //set up auto created queue configs + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "c", + true, new int[] {1,2}); + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "c" }); + conf.setCapacity(C, 100f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 30.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + return conf; + } + + protected CapacitySchedulerConfiguration + setupQueueConfigurationForPriorityBasedPreemption( + CapacitySchedulerConfiguration conf) { + + //set up auto created queue configs + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "c", + true, new int[] {1,2}); + + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "d", + true, new int[] {3,4}); + + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "e", + true, new int[] {0}); + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "c", "d", "e" }); + conf.setCapacity(C, 45f); + conf.setCapacity(D, 45f); + conf.setCapacity(E, 10f); + + conf.setUserLimitFactor(E, 3.0f); + conf.setUserLimitFactor(C, 3.0f); + conf.setUserLimitFactor(D, 3.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + conf.setAutoCreateChildQueueEnabled(D, true); + conf.setAutoCreateChildQueueEnabled(E, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + + conf.setAutoCreatedLeafQueueConfigCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(D, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(D, 3.0f); + + conf.setAutoCreatedLeafQueueConfigCapacity(E, 100.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(E, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(E, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(E, 3.0f); + + + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".d", 2); + + return conf; + } + + @Test(timeout = 60000) + public void testSimpleSurgicalPreemptionOnAutoCreatedLeafQueues() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *                    C
+     *            /       |     \
+     *           USER1   USER2   USER3
+     *          30      30        30
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-USER1 first, it asked 32 * 1G containers + * We will allocate 16 on n1 and 16 on n2. + * + * 3) app2 submit to queue-USER2, ask for one 1G container (for AM) + * + * 4) app2 asks for another 6G container, it will be reserved on n1 + * + * Now: we have: + * n1: 17 from app1, 1 from app2, and 1 reserved from app2 + * n2: 16 from app1. + * + * After preemption, we should expect: + * Preempt 4 containers from app1 on n1. + */ + setupQueueConfigurationForSimpleSurgicalPreemption(conf); + testSimpleSurgicalPreemption(USER1, USER2, USER1, USER2); + } + + @Test(timeout = 600000) + public void + testPriorityPreemptionFromHighestPriorityManagedParentQueueAndOldestContainer() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           c   d   e
+     *          45  45  10
+     * 
+ * + * Priority of queue_c = 1 + * Priority of queue_d = 2 + * + * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-e first (AM=1G), it asked 4 * 1G containers + * We will allocate 1 container on each of n0-n4. AM on n4. + * + * 3) app2 submit to queue-c, AM container=0.5G, allocated on n0 + * Ask for 2 * 3.5G containers. (Reserved on n0/n1) + * + * 4) app2 submit to queue-d, AM container=0.5G, allocated on n2 + * Ask for 2 * 3.5G containers. (Reserved on n2/n3) + * + * First we will preempt container on n2 since it is the oldest container of + * Highest priority queue (d) + */ + + // Total preemption = 1G per round, which is 5% of cluster resource (20G) + setupQueueConfigurationForPriorityBasedPreemption(conf); + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(new + String[] {USER1, USER3, USER0}, new String[] {USER1, USER3, USER0}); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 8a7e03f8562..c20e0914b14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; + +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels + .RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -86,8 +89,19 @@ public void testSimpleSurgicalPreemption() * After preemption, we should expect: * Preempt 4 containers from app1 on n1. */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); + testSimpleSurgicalPreemption("a", "c", "user", "user"); + } + + protected void testSimpleSurgicalPreemption(String queue1, String queue2, + String user1, String user2) + throws Exception { + + MockRM rm1 = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); @@ -97,7 +111,7 @@ public void testSimpleSurgicalPreemption() RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + RMApp app1 = rm1.submitApp(1 * GB, "app", user1, null, queue1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); am1.allocate("*", 1 * GB, 32, new ArrayList()); @@ -120,7 +134,7 @@ public void testSimpleSurgicalPreemption() // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + RMApp app2 = rm1.submitApp(1 * GB, "app", user2, null, queue2); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); // NM1/NM2 has available resource = 2G/4G @@ -632,6 +646,21 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() * Highest priority queue (b) */ + // A/B has higher priority + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a" , 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); + + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(new + String[] {"a", "b", "c"}, new String[] {"user", "user", "user"}); + + } + + protected void + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(String[] + queues, String[] users) throws Exception { // Total preemption = 1G per round, which is 5% of cluster resource (20G) conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, 0.05f); @@ -641,15 +670,11 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); - // A/B has higher priority - conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1); - conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); - - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); + MockRM rm1 = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; rm1.start(); MockNM[] mockNMs = new MockNM[5]; @@ -665,7 +690,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() } // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + RMApp app1 = rm1.submitApp(1 * GB, "app", users[2], null, queues[2]); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); am1.allocate("*", 1 * GB, 4, new ArrayList<>()); @@ -685,7 +710,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() } // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0) - RMApp app2 = rm1.submitApp(512, "app", "user", null, "a"); + RMApp app2 = rm1.submitApp(512, "app", users[0], null, queues[0]); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); @@ -703,11 +728,11 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() Assert.assertNotNull("Should reserve on nm-" + i, cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) - .getReservedContainer().getQueueName(), "a"); + .getReservedContainer().getQueueName(), queues[0]); } // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2) - RMApp app3 = rm1.submitApp(512, "app", "user", null, "b"); + RMApp app3 = rm1.submitApp(512, "app", users[1], null, queues[1]); MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); @@ -725,7 +750,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() Assert.assertNotNull("Should reserve on nm-" + i, cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) - .getReservedContainer().getQueueName(), "b"); + .getReservedContainer().getQueueName(), queues[1]); } // Sleep the timeout interval, we should be able to see 1 container selected @@ -831,6 +856,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() rm1.close(); } + @Test(timeout = 60000) public void testPreemptionForFragmentatedCluster() throws Exception { // Set additional_balance_queue_based_on_reserved_res to true to get