diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index a8c62fd..22537f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -54,7 +54,7 @@ super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, false); + preemptionContext, true); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 5067412..2192884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -243,4 +245,96 @@ public void testSurgicalPreemptionWithAvailableResource() rm1.close(); } + + @Test(timeout = 60000) + public void testSurgicalPreemptionForGS() + throws Exception { + /** + * Two queues, a/b, each of them are 50/50 + * 5 nodes in the cluster, each of them is 30G. + * + * Submit first app, AM = 3G, and 4 * 21G containers. + * Submit second app, AM = 3G, and 4 * 21G containers, + * + * We cannot get anything preempted from 1st app. + */ + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 1024 * 21); + conf.setQueues("root", new String[] { "a", "b" }); + conf.setCapacity("root.a", 50); + conf.setUserLimitFactor("root.a", 100); + conf.setCapacity("root.b", 50); + conf.setUserLimitFactor("root.b", 100); + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0)); + + am1.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App1 should have 5 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2)); + + am2.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App2 should have 2 containers now + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + + // Call editSchedule twice and allocation once, container should get allocated + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) { + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + tick++; + Thread.sleep(100); + } + Assert.assertEquals(3, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } }