diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 6ed90f816a5..80995b5cede 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -102,7 +102,8 @@ public void run() { * optimizing for least number of AM container preemptions. Only nodes * that match the locality level specified in the {@link ResourceRequest} * are considered. However, if this would lead to AM preemption, and locality - * relaxation is allowed, then the search space is expanded to all nodes. + * relaxation is allowed, then the search space is expanded to the remaining + * nodes. * * @param starvedApp starved application for which we are identifying * preemption targets @@ -122,12 +123,19 @@ public void run() { // Don't preempt AM containers just to satisfy local requests if relax // locality is enabled. - if (bestContainers != null - && bestContainers.numAMContainers > 0 - && !ResourceRequest.isAnyLocation(rr.getResourceName()) - && rr.getRelaxLocality()) { - bestContainers = identifyContainersToPreemptForOneContainer( - scheduler.getNodeTracker().getAllNodes(), rr); + if (rr.getRelaxLocality() + && !ResourceRequest.isAnyLocation(rr.getResourceName()) + && bestContainers != null + && bestContainers.numAMContainers > 0) { + List remainingNodes = + scheduler.getNodeTracker().getAllNodes(); + remainingNodes.removeAll(potentialNodes); + PreemptableContainers spareContainers = + identifyContainersToPreemptForOneContainer(remainingNodes, rr); + if (spareContainers != null && spareContainers.numAMContainers + < bestContainers.numAMContainers) { + bestContainers = spareContainers; + } } if (bestContainers != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..23c893f1508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -226,10 +226,8 @@ private void sendEnoughNodeUpdatesToAssignFully() { } } - /** + /* * Submit an application to a given queue and take over the entire cluster. - * - * @param queueName queue name */ private void takeAllResources(String queueName) { // Create an app that takes up all the resources on the cluster @@ -246,13 +244,9 @@ private void takeAllResources(String queueName) { == greedyApp.isPreemptable()); } - /** + /* * Submit an application to a given queue and preempt half resources of the * cluster. - * - * @param queueName queue name - * @throws InterruptedException - * if any thread has interrupted the current thread. */ private void preemptHalfResources(String queueName) throws InterruptedException { @@ -266,14 +260,10 @@ private void preemptHalfResources(String queueName) scheduler.update(); } - /** - * Submit application to {@code queue1} and take over the entire cluster. - * Submit application with larger containers to {@code queue2} that - * requires preemption from the first application. - * - * @param queue1 first queue - * @param queue2 second queue - * @throws InterruptedException if interrupted while waiting + /* + * Submit application to queue1 and take over the entire cluster. Submit + * application with larger containers to queue2 that requires preemption + * from the first application. */ private void submitApps(String queue1, String queue2) throws InterruptedException { @@ -368,10 +358,8 @@ public void testNoPreemptionFromDisallowedQueue() throws Exception { verifyNoPreemption(); } - /** + /* * Set the number of AM containers for each node. - * - * @param numAMContainersPerNode number of AM containers per node */ private void setNumAMContainersPerNode(int numAMContainersPerNode) { List potentialNodes = @@ -388,10 +376,7 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { } private void setAllAMContainersOnNode(NodeId nodeId) { - SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); - for (RMContainer container: node.getCopiedListOfRunningContainers()) { - ((RMContainerImpl) container).setAMContainer(true); - } + setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId); } @Test @@ -412,37 +397,57 @@ public void testPreemptionSelectNonAMContainer() throws Exception { + "nodes.", !host0.equals(host1)); } - @Test - public void testRelaxLocalityToNotPreemptAM() throws Exception { - takeAllResources("root.preemptable.child-1"); - RMNode node1 = rmNodes.get(0); - setAllAMContainersOnNode(node1.getNodeID()); - SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); - ApplicationAttemptId greedyAppAttemptId = - node.getCopiedListOfRunningContainers().get(0) - .getApplicationAttemptId(); + private void setNumAMContainersOnNode(int num, NodeId nodeId) { + int count = 0; + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + count++; + if (count <= num) { + ((RMContainerImpl) container).setAMContainer(true); + } else { + break; + } + } + } + + private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + return node.getCopiedListOfRunningContainers().get(0) + .getApplicationAttemptId(); + } + + /* + * Send the resource requests allowed relax locality to scheduler. The + * params node/nodeMemory/numNodeContainers used for NODE_LOCAL request. + */ + private void updateRelaxLocalityRequestSchedule(RMNode node, int nodeMemory + , int numNodeContainers) { // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be // satisfied. This forces the RR that we consider for preemption to be the // NODE_LOCAL one. - ResourceRequest nodeRequest = - createResourceRequest(GB, node1.getHostName(), 1, 4, true); + ResourceRequest nodeRequest = createResourceRequest(nodeMemory, + node.getHostName(), 1, numNodeContainers, true); ResourceRequest rackRequest = - createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); + createResourceRequest(GB * 10, node.getRackName(), 1, 1, true); ResourceRequest anyRequest = - createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); List resourceRequests = - Arrays.asList(nodeRequest, rackRequest, anyRequest); + Arrays.asList(nodeRequest, rackRequest, anyRequest); ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( - "root.preemptable.child-2", "default", resourceRequests); + "root.preemptable.child-2", "default", resourceRequests); starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); // Move clock enough to identify starvation clock.tickSec(1); scheduler.update(); + } + private void verifyRelaxLocalityPreemption(NodeId notBePreemptedNodeId, + ApplicationAttemptId greedyAttemptId, int numGreedyAppContainers) + throws Exception { // Make sure 4 containers were preempted from the greedy app, but also that // none were preempted on our all-AM node, even though the NODE_LOCAL RR // asked for resources on it. @@ -450,13 +455,39 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { // TODO (YARN-7655) The starved app should be allocated 4 containers. // It should be possible to modify the RRs such that this is true // after YARN-7903. - verifyPreemption(0, 4); + verifyPreemption(0, numGreedyAppContainers); + SchedulerNode node = scheduler.getNodeTracker() + .getNode(notBePreemptedNodeId); for (RMContainer container : node.getCopiedListOfRunningContainers()) { - assert (container.isAMContainer()); - assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); + assert(container.isAMContainer()); + assert(container.getApplicationAttemptId().equals(greedyAttemptId)); } } + @Test + public void testRelaxLocalityToNotPreemptAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setAllAMContainersOnNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node1.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB, 4); + verifyRelaxLocalityPreemption(node1.getNodeID(), greedyAppAttemptId, 4); + } + + @Test + public void testRelaxLocalityToPreemptBestAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setNumAMContainersOnNode(3, node1.getNodeID()); + RMNode node2 = rmNodes.get(1); + setAllAMContainersOnNode(node2.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node2.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB * 2, 1); + verifyRelaxLocalityPreemption(node2.getNodeID(), greedyAppAttemptId, 6); + } + @Test public void testAppNotPreemptedBelowFairShare() throws Exception { takeAllResources("root.preemptable.child-1");