diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index a4d69bf..b8550b5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.junit.Before; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -74,18 +73,6 @@ public TestFairSchedulerPreemption(String name, boolean fairshare) throws IOException { fairsharePreemption = fairshare; - writeAllocFile(); - } - - @Before - public void setup() throws IOException { - createConfiguration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - ALLOC_FILE.getAbsolutePath()); - conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); - conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); - conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); - setupCluster(); } @After @@ -109,6 +96,8 @@ private void writeAllocFile() throws IOException { * |--- child-1 * |--- child-2 */ + String minResource = "4096mb,4vcores"; + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); @@ -118,12 +107,12 @@ private void writeAllocFile() throws IOException { // Child-1 out.println(""); - writeResourceParams(out); + writeMinResourceParams(out, minResource); out.println(""); // Child-2 out.println(""); - writeResourceParams(out); + writeMinResourceParams(out, minResource); out.println(""); out.println(""); // end of preemptable queue @@ -136,12 +125,12 @@ private void writeAllocFile() throws IOException { // Child-1 out.println(""); - writeResourceParams(out); + writeMinResourceParams(out, minResource); out.println(""); // Child-2 out.println(""); - writeResourceParams(out); + writeMinResourceParams(out, minResource); out.println(""); out.println(""); // end of nonpreemptable queue @@ -165,13 +154,20 @@ private void writePreemptionParams(PrintWriter out) { } } - private void writeResourceParams(PrintWriter out) { + private void writeMinResourceParams(PrintWriter out, String minResource) { if (!fairsharePreemption) { - out.println("4096mb,4vcores"); + out.println("" + minResource + ""); } } private void setupCluster() throws IOException { + createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + resourceManager = new MockRM(conf); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); scheduler.setClock(clock); @@ -180,14 +176,6 @@ private void setupCluster() throws IOException { // Create and add two nodes to the cluster addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); - - // Verify if child-1 and child-2 are preemptable - FSQueue child1 = - scheduler.getQueueManager().getQueue("nonpreemptable.child-1"); - assertFalse(child1.isPreemptable()); - FSQueue child2 = - scheduler.getQueueManager().getQueue("nonpreemptable.child-2"); - assertFalse(child2.isPreemptable()); } private void sendEnoughNodeUpdatesToAssignFully() { @@ -286,6 +274,8 @@ private void verifyNoPreemption() throws InterruptedException { @Test public void testPreemptionWithinSameLeafQueue() throws Exception { + writeAllocFile(); + setupCluster(); String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { @@ -297,18 +287,24 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { + writeAllocFile(); + setupCluster(); submitApps("root.preemptable.child-1", "root.preemptable.child-2"); verifyPreemption(); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { + writeAllocFile(); + setupCluster(); submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); verifyPreemption(); } @Test public void testNoPreemptionFromDisallowedQueue() throws Exception { + writeAllocFile(); + setupCluster(); submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } @@ -334,6 +330,8 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { @Test public void testPreemptionSelectNonAMContainer() throws Exception { + writeAllocFile(); + setupCluster(); takeAllResources("root.preemptable.child-1"); setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); @@ -349,4 +347,61 @@ public void testPreemptionSelectNonAMContainer() throws Exception { assertTrue("Preempted containers should come from two different " + "nodes.", !host0.equals(host1)); } + + @Test + public void testPreemptionIfCheckChildQueuesWhenParentQueueUnderFairShare() + throws Exception { + String minResource = "2048mb,2vcores"; + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + writePreemptionParams(out); + out.println(" "); + writeMinResourceParams(out, minResource); + out.println(" "); + out.println(" "); + writeMinResourceParams(out, minResource); + out.println(" "); + out.println(""); + out.println(""); + writePreemptionParams(out); + writeMinResourceParams(out, minResource); + out.println(""); + out.println(""); + out.close(); + + setupCluster(); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(GB, 1, "queueA.queueA1", + "default", 4); + ApplicationAttemptId app2 = createSchedulingRequest(GB, 1, "queueB", + "default", 4); + scheduler.update(); + sendEnoughNodeUpdatesToAssignFully(); + + // verify if the apps got the containers they requested + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + ApplicationAttemptId app3 = createSchedulingRequest(GB, 1, "queueA.queueA2", + "default", 2); + clock.tickSec(1); + scheduler.update(); + + // Sleep long enough to ensure preemption happen. + for (int i = 0; i < 100; i++) { + if (scheduler.getSchedulerApp(app3).getLiveContainers().size() == 2) { + break; + } + Thread.sleep(10); + } + + // Verify if containers required by queueA2 are preempted from queueA1 + // instead of queueB + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + } }