diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 3579857..ff34a0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -65,7 +65,7 @@ public void run() { if (!Resources.isNone(starvedApp.getStarvation())) { List containers = identifyContainersToPreempt(starvedApp); - if (containers != null) { + if (containers != null && containers.size() > 0) { preemptContainers(containers); } } @@ -77,6 +77,33 @@ public void run() { } /** + * Move non-AM containers to the front. + * + * @param containers the container list + */ + private void moveNonAMContainerFirst(List containers) { + int left = 0; + int right = containers.size() - 1; + while (left < right) { + if (containers.get(left).isAMContainer()) { + if (!containers.get(right).isAMContainer()) { + // swap + RMContainer rightContainer = containers.get(right); + containers.set(right, containers.get(left)); + containers.set(left, rightContainer); + + left++; + right--; + } else { + right--; + } + } else { + left++; + } + } + } + + /** * Given an app, identify containers to preempt to satisfy the app's next * resource request. * @@ -87,12 +114,13 @@ public void run() { */ private List identifyContainersToPreempt( FSAppAttempt starvedApp) { - List containers = new ArrayList<>(); // return value + PreemptableContainers containers = new PreemptableContainers(); + containers.numAMContainer = Integer.MAX_VALUE; // Find the nodes that match the next resource request ResourceRequest request = starvedApp.getNextResourceRequest(); - // TODO (KK): Should we check other resource requests if we can't match - // the first one? + // TODO (YARN-6038): Should we check other resource requests if we can't + // match the first one? Resource requestCapability = request.getCapability(); List potentialNodes = @@ -102,9 +130,6 @@ public void run() { // From the potential nodes, pick a node that has enough containers // from apps over their fairshare for (FSSchedulerNode node : potentialNodes) { - // Reset containers for the new node being considered. - containers.clear(); - // TODO (YARN-5829): Attempt to reserve the node for starved app. The // subsequent if-check needs to be reworked accordingly. FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); @@ -114,33 +139,74 @@ public void run() { continue; } - // Figure out list of containers to consider - List containersToCheck = - node.getCopiedListOfRunningContainers(); - containersToCheck.removeAll(node.getContainersForPreemption()); - - // Initialize potential with unallocated resources - Resource potential = Resources.clone(node.getUnallocatedResource()); - for (RMContainer container : containersToCheck) { - FSAppAttempt app = - scheduler.getSchedulerApp(container.getApplicationAttemptId()); - - if (app.canContainerBePreempted(container)) { - // Flag container for preemption - containers.add(container); - Resources.addTo(potential, container.getAllocatedResource()); - } + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode(requestCapability, node, + containers.numAMContainer); + if (preemptableContainers == null) { + continue; + } - // Check if we have already identified enough containers - if (Resources.fitsIn(requestCapability, potential)) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - node.addContainersForPreemption(containers); - return containers; - } else { - // TODO (YARN-5829): Unreserve the node for the starved app. + if (preemptableContainers.numAMContainer == 0) { + return preemptableContainers.containers; + } + + assert preemptableContainers.numAMContainer < containers.numAMContainer; + containers.containers = preemptableContainers.containers; + containers.numAMContainer = preemptableContainers.numAMContainer; + } + + return containers.containers; + } + + /** + * Identify containers to preempt in one node. + * + * @param request resource requested + * @param node the node to check + * @param numAMContainer the smallest number of AM containers + * @return a list of preemptable containers + */ + private PreemptableContainers identifyContainersToPreemptOnNode( + Resource request, FSSchedulerNode node, int numAMContainer) { + PreemptableContainers preemptableContainers = new PreemptableContainers(); + + // Figure out list of containers to consider + List containersToCheck = + node.getCopiedListOfRunningContainers(); + containersToCheck.removeAll(node.getContainersForPreemption()); + + // Initialize potential with unallocated resources + Resource potential = Resources.clone(node.getUnallocatedResource()); + + moveNonAMContainerFirst(containersToCheck); + + for (RMContainer container : containersToCheck) { + FSAppAttempt app = + scheduler.getSchedulerApp(container.getApplicationAttemptId()); + + if (app.canContainerBePreempted(container)) { + // Flag container for preemption + preemptableContainers.containers.add(container); + if (container.isAMContainer()) { + preemptableContainers.numAMContainer++; + // Stop if there are more AM containers in this node + if (preemptableContainers.numAMContainer >= numAMContainer) { + return null; + } } + + Resources.addTo(potential, container.getAllocatedResource()); + } + + // Check if we have already identified enough containers + if (Resources.fitsIn(request, potential)) { + // Mark the containers as being considered for preemption on the node. + // Make sure the containers are subsequently removed by calling + // FSSchedulerNode#removeContainerForPreemption. + node.addContainersForPreemption(preemptableContainers.containers); + return preemptableContainers; + } else { + // TODO (YARN-5829): Unreserve the node for the starved app. } } return null; @@ -185,4 +251,14 @@ public void run() { } } } + + private class PreemptableContainers { + List containers; + int numAMContainer; + + PreemptableContainers () { + containers = new ArrayList<>(); + numAMContainer = 0; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 36ee685..c2c4d1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -19,11 +19,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,8 +37,10 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; /** * Tests to verify fairshare and minshare preemption, using parameterization. @@ -42,6 +48,7 @@ @RunWith(Parameterized.class) public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private static final int GB = 1024; // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -164,8 +171,8 @@ private void setupCluster() throws IOException { scheduler = (FairScheduler) resourceManager.getResourceScheduler(); // Create and add two nodes to the cluster - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); } private void sendEnoughNodeUpdatesToAssignFully() { @@ -179,37 +186,56 @@ private void sendEnoughNodeUpdatesToAssignFully() { } /** - * Submit application to {@code queue1} and take over the entire cluster. - * Submit application with larger containers to {@code queue2} that - * requires preemption from the first application. + * Submit an application to a given queue and take over the entire cluster. * - * @param queue1 first queue - * @param queue2 second queue - * @throws InterruptedException if interrupted while waiting + * @param queueName queue name */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void takeAllResource(String queueName) { // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId1 - = createSchedulingRequest(1024, 1, queue1, "default", + ApplicationAttemptId appAttemptId + = createSchedulingRequest(GB, 1, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size()); - greedyApp = scheduler.getSchedulerApp(appAttemptId1); + greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); assertEquals(8, greedyApp.getLiveContainers().size()); + } - // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId2 - = createSchedulingRequest(2048, 2, queue2, "default", + /** + * Submit an application to a given queue and preempt half resources of the + * cluster. + * + * @param queueName queue name + * @throws InterruptedException + * if any thread has interrupted the current thread. + */ + private void preemptHalfResources(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); - starvingApp = scheduler.getSchedulerApp(appAttemptId2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); // Sleep long enough to pass Thread.sleep(10); - scheduler.update(); } + /** + * Submit application to {@code queue1} and take over the entire cluster. + * Submit application with larger containers to {@code queue2} that + * requires preemption from the first application. + * + * @param queue1 first queue + * @param queue2 second queue + * @throws InterruptedException if interrupted while waiting + */ + private void submitApps(String queue1, String queue2) + throws InterruptedException { + takeAllResource(queue1); + preemptHalfResources(queue2); + } + private void verifyPreemption() throws InterruptedException { // Sleep long enough for four containers to be preempted. Note that the // starved app must be queued four times for containers to be preempted. @@ -272,4 +298,64 @@ public void testNoPreemptionFromDisallowedQueue() throws Exception { submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } + + /** + * Set the number of AM container for each node. + * + * @param numOfAMConainerPerNode number of AM container per node + */ + private void setAmContainerNumPerNode(int numOfAMConainerPerNode) { + List potentialNodes = + scheduler.getNodeTracker().getNodesByResourceName("*"); + for (FSSchedulerNode node: potentialNodes) { + List containers= + node.getCopiedListOfRunningContainers(); + // Change the first numOfAMConainerPerNode out of 4 containers to + // AM containers + for (int i = 0; i < numOfAMConainerPerNode; i++) { + ((RMContainerImpl) containers.get(i)).setAMContainer(true); + } + } + } + + @Test + public void testPreemptionSelectNonAMContainer() throws Exception { + setupCluster(); + + takeAllResource("root.preemptable.child-1"); + setAmContainerNumPerNode(2); + preemptHalfResources("root.preemptable.child-2"); + + verifyPreemption(); + + ArrayList containers = + (ArrayList) starvingApp.getLiveContainers(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Each node provides two and only two non-AM containers to be preempted, so + // the preemption happens on both nodes. + Assert.assertTrue("Preempted containers should come from two different " + + "nodes.", !host0.equals(host1)); + } + + @Test + public void testPreemptionSelectAMContainers() throws Exception { + setupCluster(); + + takeAllResource("root.preemptable.child-1"); + setAmContainerNumPerNode(3); + preemptHalfResources("root.preemptable.child-2"); + + verifyPreemption(); + + ArrayList containers = + (ArrayList) starvingApp.getLiveContainers(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Cannot find a solution without AM containers since each solution at least + // need two containers per node and there are only one non-AM container + // per node, so the two containers will be preempted from the same node. + Assert.assertTrue("Preempted containers should come from the same nodes.", + host0.equals(host1)); + } }