diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 15fd830..b87c2ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.LinkedList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -382,6 +383,22 @@ public int getNumContainers() { } /** + * Get the running containers in the node with AM containers in the end. + * @return List of running containers in the node. + */ + public synchronized List getCopiedListOfRunningContainersWithAMInEnd() { + LinkedList result = new LinkedList<>(); + for (ContainerInfo info : launchedContainers.values()) { + if(info.container.isAMContainer()) { + result.addLast(info.container); + } else { + result.addFirst(info.container); + } + } + return result; + } + + /** * Get the container for the specified container ID. * @param containerId The container ID * @return The container for the specified container ID diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index f432484..894f9b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -65,10 +65,10 @@ public void run() { try{ starvedApp = context.getStarvedApps().take(); if (!Resources.isNone(starvedApp.getStarvation())) { - List containers = + PreemptableContainers containers = identifyContainersToPreempt(starvedApp); if (containers != null) { - preemptContainers(containers); + preemptContainers(containers.containers); } } } catch (InterruptedException e) { @@ -87,9 +87,10 @@ public void run() { * @return list of containers to preempt to satisfy starvedApp, null if the * app cannot be satisfied by preempting any running containers */ - private List identifyContainersToPreempt( + private PreemptableContainers identifyContainersToPreempt( FSAppAttempt starvedApp) { - List containers = new ArrayList<>(); // return value + PreemptableContainers bestContainers = null; + int maxAMContainers = Integer.MAX_VALUE; // Find the nodes that match the next resource request SchedulingPlacementSet nextPs = @@ -107,9 +108,6 @@ public void run() { // From the potential nodes, pick a node that has enough containers // from apps over their fairshare for (FSSchedulerNode node : potentialNodes) { - // Reset containers for the new node being considered. - containers.clear(); - // TODO (YARN-5829): Attempt to reserve the node for starved app. The // subsequent if-check needs to be reworked accordingly. FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); @@ -119,39 +117,82 @@ public void run() { continue; } - // Figure out list of containers to consider - List containersToCheck = - node.getCopiedListOfRunningContainers(); - containersToCheck.removeAll(node.getContainersForPreemption()); - - // Initialize potential with unallocated resources - Resource potential = Resources.clone(node.getUnallocatedResource()); - for (RMContainer container : containersToCheck) { - FSAppAttempt app = - scheduler.getSchedulerApp(container.getApplicationAttemptId()); - - if (app.canContainerBePreempted(container)) { - // Flag container for preemption - containers.add(container); - Resources.addTo(potential, container.getAllocatedResource()); + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode(requestCapability, node, + maxAMContainers); + if (preemptableContainers != null) { + if (preemptableContainers.numAMContainers == 0) { + return preemptableContainers; + } else { + bestContainers = preemptableContainers; + maxAMContainers = bestContainers.numAMContainers; } + } + } - // Check if we have already identified enough containers - if (Resources.fitsIn(requestCapability, potential)) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - node.addContainersForPreemption(containers); - return containers; - } else { - // TODO (YARN-5829): Unreserve the node for the starved app. + return bestContainers; + } + + /** + * Identify containers to preempt by one given node. Try to find a list with + * least AM containers to avoid preempt AM containers. So, this method checks + * the non-AM containers first, return the first container list satisfying + * the resource requests if number of AM containers is less than + * maxAMContainer, otherwise return null. + * + * @param request resource requested + * @param node the node to check + * @param maxAMContainer the number of AM containers of last best solution + * @return a list of preemptable containers; null if we cannot find a list + * of container to preempted in this node or there are more AM + * containers than maxAMContainer needed to preempt. + */ + private PreemptableContainers identifyContainersToPreemptOnNode( + Resource request, FSSchedulerNode node, int maxAMContainer) { + PreemptableContainers preemptableContainers = + new PreemptableContainers(maxAMContainer); + + // Figure out list of containers to consider + List containersToCheck = + node.getCopiedListOfRunningContainersWithAMInEnd(); + containersToCheck.removeAll(node.getContainersForPreemption()); + + // Initialize potential with unallocated resources + Resource potential = Resources.clone(node.getUnallocatedResource()); + + for (RMContainer container : containersToCheck) { + FSAppAttempt app = + scheduler.getSchedulerApp(container.getApplicationAttemptId()); + + if (app.canContainerBePreempted(container)) { + // Flag container for preemption + if (!preemptableContainers.addContainer(container)) { + return null; } + + Resources.addTo(potential, container.getAllocatedResource()); + } + + // Check if we have already identified enough containers + if (Resources.fitsIn(request, potential)) { + return preemptableContainers; + } else { + // TODO (YARN-5829): Unreserve the node for the starved app. } } return null; } private void preemptContainers(List containers) { + // Mark the containers as being considered for preemption on the node. + // Make sure the containers are subsequently removed by calling + // FSSchedulerNode#removeContainerForPreemption. + if (containers.size() > 0) { + FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() + .getNode(containers.get(0).getNodeId()); + node.addContainersForPreemption(containers); + } + // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); @@ -190,4 +231,38 @@ public void run() { } } } + + /** + * A class to trace preemptable containers. + */ + private static class PreemptableContainers { + List containers; + int numAMContainers; + int maxAMContainers; + + PreemptableContainers(int maxAMContainers) { + containers = new ArrayList<>(); + numAMContainers = 0; + this.maxAMContainers = maxAMContainers; + } + + /** + * Add a container into the list if the number of AM containers is less than + * maxAMContainers. + * + * @param container the container to add + * @return true if success; false otherwise + */ + private boolean addContainer(RMContainer container) { + if (container.isAMContainer()) { + numAMContainers++; + if (numAMContainers >= maxAMContainers) { + return false; + } + } + + containers.add(container); + return true; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 36ee685..c2c4d1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -19,11 +19,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; + +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,8 +37,10 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; /** * Tests to verify fairshare and minshare preemption, using parameterization. @@ -42,6 +48,7 @@ @RunWith(Parameterized.class) public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private static final int GB = 1024; // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -164,8 +171,8 @@ private void setupCluster() throws IOException { scheduler = (FairScheduler) resourceManager.getResourceScheduler(); // Create and add two nodes to the cluster - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); } private void sendEnoughNodeUpdatesToAssignFully() { @@ -179,37 +186,56 @@ private void sendEnoughNodeUpdatesToAssignFully() { } /** - * Submit application to {@code queue1} and take over the entire cluster. - * Submit application with larger containers to {@code queue2} that - * requires preemption from the first application. + * Submit an application to a given queue and take over the entire cluster. * - * @param queue1 first queue - * @param queue2 second queue - * @throws InterruptedException if interrupted while waiting + * @param queueName queue name */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void takeAllResource(String queueName) { // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId1 - = createSchedulingRequest(1024, 1, queue1, "default", + ApplicationAttemptId appAttemptId + = createSchedulingRequest(GB, 1, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size()); - greedyApp = scheduler.getSchedulerApp(appAttemptId1); + greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); assertEquals(8, greedyApp.getLiveContainers().size()); + } - // Create an app that takes up all the resources on the cluster - ApplicationAttemptId appAttemptId2 - = createSchedulingRequest(2048, 2, queue2, "default", + /** + * Submit an application to a given queue and preempt half resources of the + * cluster. + * + * @param queueName queue name + * @throws InterruptedException + * if any thread has interrupted the current thread. + */ + private void preemptHalfResources(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); - starvingApp = scheduler.getSchedulerApp(appAttemptId2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); // Sleep long enough to pass Thread.sleep(10); - scheduler.update(); } + /** + * Submit application to {@code queue1} and take over the entire cluster. + * Submit application with larger containers to {@code queue2} that + * requires preemption from the first application. + * + * @param queue1 first queue + * @param queue2 second queue + * @throws InterruptedException if interrupted while waiting + */ + private void submitApps(String queue1, String queue2) + throws InterruptedException { + takeAllResource(queue1); + preemptHalfResources(queue2); + } + private void verifyPreemption() throws InterruptedException { // Sleep long enough for four containers to be preempted. Note that the // starved app must be queued four times for containers to be preempted. @@ -272,4 +298,64 @@ public void testNoPreemptionFromDisallowedQueue() throws Exception { submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } + + /** + * Set the number of AM container for each node. + * + * @param numOfAMConainerPerNode number of AM container per node + */ + private void setAmContainerNumPerNode(int numOfAMConainerPerNode) { + List potentialNodes = + scheduler.getNodeTracker().getNodesByResourceName("*"); + for (FSSchedulerNode node: potentialNodes) { + List containers= + node.getCopiedListOfRunningContainers(); + // Change the first numOfAMConainerPerNode out of 4 containers to + // AM containers + for (int i = 0; i < numOfAMConainerPerNode; i++) { + ((RMContainerImpl) containers.get(i)).setAMContainer(true); + } + } + } + + @Test + public void testPreemptionSelectNonAMContainer() throws Exception { + setupCluster(); + + takeAllResource("root.preemptable.child-1"); + setAmContainerNumPerNode(2); + preemptHalfResources("root.preemptable.child-2"); + + verifyPreemption(); + + ArrayList containers = + (ArrayList) starvingApp.getLiveContainers(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Each node provides two and only two non-AM containers to be preempted, so + // the preemption happens on both nodes. + Assert.assertTrue("Preempted containers should come from two different " + + "nodes.", !host0.equals(host1)); + } + + @Test + public void testPreemptionSelectAMContainers() throws Exception { + setupCluster(); + + takeAllResource("root.preemptable.child-1"); + setAmContainerNumPerNode(3); + preemptHalfResources("root.preemptable.child-2"); + + verifyPreemption(); + + ArrayList containers = + (ArrayList) starvingApp.getLiveContainers(); + String host0 = containers.get(0).getNodeId().getHost(); + String host1 = containers.get(1).getNodeId().getHost(); + // Cannot find a solution without AM containers since each solution at least + // need two containers per node and there are only one non-AM container + // per node, so the two containers will be preempted from the same node. + Assert.assertTrue("Preempted containers should come from the same nodes.", + host0.equals(host1)); + } }