events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId());
- assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
+ assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 0b32676..171196f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -264,6 +265,7 @@ public void testLimitsComputation() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 1569a12..d8161f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception {
if (++counter > 2) {
break;
}
- cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check node report, 12 GB used and 4 GB available
@@ -512,7 +512,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 2) {
break;
}
- cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+ cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
@@ -542,7 +542,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 1) {
break;
}
- cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+ cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index b6c005b..16ba607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -1188,7 +1188,7 @@ public void testPreemptionInfo() throws Exception {
// kill the 3 containers
for (Container c : allocatedContainers) {
- cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@@ -1197,7 +1197,7 @@ public void testPreemptionInfo() throws Exception {
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container
- cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
+ cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed
@@ -1220,7 +1220,7 @@ public void testPreemptionInfo() throws Exception {
allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) {
- cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
+ cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
}
// check values
@@ -1269,7 +1269,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
}
// Call killContainer to preempt the container
- cs.killPreemptedContainer(rmContainer);
+ cs.markContainerForKillable(rmContainer);
Assert.assertEquals(3, requests.size());
for (ResourceRequest request : requests) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
new file mode 100644
index 0000000..cbb231f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
@@ -0,0 +1,682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerPreemption {
+ private static final Log LOG = LogFactory.getLog(
+ TestCapacitySchedulerPreemption.class);
+
+ private final int GB = 1024;
+
+ private Configuration conf;
+
+ RMNodeLabelsManager mgr;
+
+ Clock clock;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+ conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+ // Set preemption related configurations
+ conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
+ 0);
+ conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+ true);
+ conf.setFloat(
+ ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
+ conf.setFloat(
+ ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(this.conf);
+ clock = mock(Clock.class);
+ when(clock.getTime()).thenReturn(0L);
+ }
+
+ private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+ RMActiveServices activeServices = rm.getRMActiveService();
+ SchedulingMonitor mon = null;
+ for (Service service : activeServices.getServices()) {
+ if (service instanceof SchedulingMonitor) {
+ mon = (SchedulingMonitor) service;
+ break;
+ }
+ }
+
+ if (mon != null) {
+ return mon.getSchedulingEditPolicy();
+ }
+ return null;
+ }
+
+ @Test (timeout = 60000)
+ public void testSimplePreemption() throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ *
+ * 1) Two nodes in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+ * more resource available.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * Now the cluster is fulfilled.
+ *
+ * 4) app2 asks for another 1G container, system will preempt one container
+ * from app1, and app2 will receive the preempted container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 7, new ArrayList());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getUnallocatedResource().getMemory());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(1, killableContainers.size());
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 6 containers, and app2 has 2 containers
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersNodeLocalityDelay()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with locality specified, so it needs
+ * to wait for missed-opportunity before get scheduled.
+ * Check if system waits missed-opportunity before finish killable container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getUnallocatedResource().getMemory());
+
+ // AM asks for a 1 * GB container with unknown host and unknown rack
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "unknownhost",
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, one container will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersHardNodeLocality()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with hard locality specified, and
+ * asked host is not existed
+ * Confirm system doesn't preempt any container.
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getUnallocatedResource().getMemory());
+
+ // AM asks for a 1 * GB container for h3 with hard locality,
+ // h3 doesn't exist in the cluster
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1, true), ResourceRequest
+ .newInstance(Priority.newInstance(1), "h3",
+ Resources.createResource(1 * GB), 1, false), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1, false)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, nothing will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+ throws Exception {
+ /**
+ * Test case:
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Killable containers resources will be excluded from PCPP (no duplicated
+ * container added to killable list)
+ * 2) When more resources need to be preempted, new containers will be selected
+ * and killable containers will be considered
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ am2.allocate("*", 1 * GB, 1, new ArrayList());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+ // Check killable containers and to-be-preempted containers in edit policy
+ Assert.assertEquals(1, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Run edit schedule again, confirm status doesn't changed
+ editPolicy.editSchedule();
+ Assert.assertEquals(1, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Save current to kill containers
+ Set previousKillableContainers = new HashSet<>(
+ pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+ .keySet());
+
+ // Update request resource of c from 1 to 2, so we need to preempt
+ // one more container
+ am2.allocate("*", 1 * GB, 2, new ArrayList());
+
+ // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+ // and 1 container in killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(1, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+ // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(2, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Check if previous killable containers included by new killable containers
+ Map killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ Assert.assertTrue(
+ Sets.difference(previousKillableContainers, killableContainers.keySet())
+ .isEmpty());
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyCleanupKillableContainersWhenContainerCompleted()
+ throws Exception {
+ /**
+ * Test case:
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Containers will be marked to killable
+ * 2) Release the killable container
+ * 3) To-preempt/killable containers will be updated accordingly
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ am2.allocate("*", 3 * GB, 1, new ArrayList());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if 3 container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+ Assert.assertEquals(3, editPolicy.getKillableContainers().size());
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+ // Save current to kill containers
+ Set previousKillableContainers = new HashSet<>(
+ pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+ .keySet());
+
+ // Release one of killable containers
+ ContainerId killableContainerToRelease =
+ previousKillableContainers.iterator().next();
+ am1.allocate(null, Arrays.asList(killableContainerToRelease));
+
+ // Check killable-containers in scheduler side
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+ // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+ // and 1 container in killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(2, editPolicy.getKillableContainers().size());
+ // And killable container doesn't contain the released container
+ Assert.assertFalse(editPolicy.getKillableContainers()
+ .containsKey(killableContainerToRelease));
+
+ // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(2, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Check new killable containers from scheduler
+ Map killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ Assert.assertFalse(killableContainers.containsKey(killableContainerToRelease));
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+ throws Exception {
+ /**
+ * Test case:
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Containers will be marked to killable
+ * 2) Cancel resource request
+ * 3) Killable containers will be cancelled from policy and scheduler
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getUnallocatedResource().getMemory());
+ am2.allocate("*", 3 * GB, 1, new ArrayList());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if 3 container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+ Assert.assertEquals(3, editPolicy.getKillableContainers().size());
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+ // Cancel resource, no preemption required now
+ am2.allocate("*", 3 * GB, 0, new ArrayList());
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+
+ // Call editSchedule once more to make sure still nothing happens
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getKillableContainers().size());
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+ }
+
+ private Map waitKillableContainersSize(
+ PreemptionManager pm, String queueName, String partition,
+ int expectedSize) throws InterruptedException {
+ Map killableContainers =
+ pm.getKillableContainersMap(queueName, partition);
+
+ int wait = 0;
+ // Wait for at most 5 sec (it should be super fast actually)
+ while (expectedSize != killableContainers.size() && wait < 500) {
+ killableContainers = pm.getKillableContainersMap(queueName, partition);
+ Thread.sleep(10);
+ wait++;
+ }
+
+ Assert.assertEquals(expectedSize, killableContainers.size());
+ return killableContainers;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 5169337..1612201 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -99,6 +100,7 @@ public void setUp() throws Exception {
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 69b0813..87a3d51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -71,6 +71,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -150,6 +151,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
@@ -3092,6 +3094,7 @@ private CapacitySchedulerContext mockCSContext(
Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(2 * GB, 2));
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
return csContext;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index f73baa4..23dc860 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,6 +93,7 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 9047138..29c7dea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -126,6 +127,7 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 2694957..4441c6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -356,4 +356,40 @@ public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
}
+
+ /**
+ * Get a queue structure:
+ *
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ *
+ */
+ public static Configuration
+ getConfigurationWithMultipleQueues(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b", "c" });
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 100);
+ conf.setUserLimitFactor(A, 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setMaximumCapacity(B, 100);
+ conf.setUserLimitFactor(B, 100);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 100);
+ conf.setUserLimitFactor(C, 100);
+
+ return conf;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 8d7c22e..5c0aab5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -102,9 +103,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -3955,7 +3958,92 @@ public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured()
}
}
}
-
+
+ @Test(timeout = 5000)
+ public void testRecoverRequestAfterPreemption() throws Exception {
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ Priority priority = Priority.newInstance(20);
+ String host = "127.0.0.1";
+ int GB = 1024;
+
+ // Create Node and raised Node Added event
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(16 * 1024, 4), 0, host);
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ // Create 3 container requests and place it in ask
+ List ask = new ArrayList();
+ ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+ priority.getPriority(), 1, true);
+ ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+ node.getRackName(), priority.getPriority(), 1, true);
+ ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+ ResourceRequest.ANY, priority.getPriority(), 1, true);
+ ask.add(nodeLocalRequest);
+ ask.add(rackLocalRequest);
+ ask.add(offRackRequest);
+
+ // Create Request and update
+ ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+ "user1", ask);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeUpdate);
+
+ assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+ .size());
+ SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
+
+ // ResourceRequest will be empty once NodeUpdate is completed
+ Assert.assertNull(app.getResourceRequest(priority, host));
+
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+ RMContainer rmContainer = app.getRMContainer(containerId1);
+
+ // Create a preempt event and register for preemption
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Wait for few clock ticks
+ clock.tickSec(5);
+
+ // preempt now
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Trigger container rescheduled event
+ scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
+ SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
+
+ List requests = rmContainer.getResourceRequests();
+ // Once recovered, resource request will be present again in app
+ Assert.assertEquals(3, requests.size());
+ for (ResourceRequest request : requests) {
+ Assert.assertEquals(1,
+ app.getResourceRequest(priority, request.getResourceName())
+ .getNumContainers());
+ }
+
+ // Send node heartbeat
+ scheduler.update();
+ scheduler.handle(nodeUpdate);
+
+ List containers = scheduler.allocate(appAttemptId,
+ Collections. emptyList(),
+ Collections. emptyList(), null, null, null, null).getContainers();
+
+ // Now with updated ResourceRequest, a container is allocated for AM.
+ Assert.assertTrue(containers.size() == 1);
+ }
+
@Test
public void testBlacklistNodes() throws Exception {
scheduler.init(conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 5bdcc08..2456594 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -1451,7 +1451,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
// Trigger container rescheduled event
scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
- SchedulerEventType.KILL_PREEMPTED_CONTAINER));
+ SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
List requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app