diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 6ed90f816a5..80995b5cede 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -102,7 +102,8 @@ public void run() { * optimizing for least number of AM container preemptions. Only nodes * that match the locality level specified in the {@link ResourceRequest} * are considered. However, if this would lead to AM preemption, and locality - * relaxation is allowed, then the search space is expanded to all nodes. + * relaxation is allowed, then the search space is expanded to the remaining + * nodes. * * @param starvedApp starved application for which we are identifying * preemption targets @@ -122,12 +123,19 @@ public void run() { // Don't preempt AM containers just to satisfy local requests if relax // locality is enabled. - if (bestContainers != null - && bestContainers.numAMContainers > 0 - && !ResourceRequest.isAnyLocation(rr.getResourceName()) - && rr.getRelaxLocality()) { - bestContainers = identifyContainersToPreemptForOneContainer( - scheduler.getNodeTracker().getAllNodes(), rr); + if (rr.getRelaxLocality() + && !ResourceRequest.isAnyLocation(rr.getResourceName()) + && bestContainers != null + && bestContainers.numAMContainers > 0) { + List remainingNodes = + scheduler.getNodeTracker().getAllNodes(); + remainingNodes.removeAll(potentialNodes); + PreemptableContainers spareContainers = + identifyContainersToPreemptForOneContainer(remainingNodes, rr); + if (spareContainers != null && spareContainers.numAMContainers + < bestContainers.numAMContainers) { + bestContainers = spareContainers; + } } if (bestContainers != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..ec2cf6b88b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -388,10 +388,7 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { } private void setAllAMContainersOnNode(NodeId nodeId) { - SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); - for (RMContainer container: node.getCopiedListOfRunningContainers()) { - ((RMContainerImpl) container).setAMContainer(true); - } + setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId); } @Test @@ -412,37 +409,62 @@ public void testPreemptionSelectNonAMContainer() throws Exception { + "nodes.", !host0.equals(host1)); } - @Test - public void testRelaxLocalityToNotPreemptAM() throws Exception { - takeAllResources("root.preemptable.child-1"); - RMNode node1 = rmNodes.get(0); - setAllAMContainersOnNode(node1.getNodeID()); - SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); - ApplicationAttemptId greedyAppAttemptId = - node.getCopiedListOfRunningContainers().get(0) - .getApplicationAttemptId(); + private void setNumAMContainersOnNode(int num, NodeId nodeId) { + int count = 0; + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + count++; + if (count <= num) { + ((RMContainerImpl) container).setAMContainer(true); + } else { + break; + } + } + } + + private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + return node.getCopiedListOfRunningContainers().get(0) + .getApplicationAttemptId(); + } + + /** + * Send the resource requests allowed relax locality to scheduler + * @param node the node which we get the resource name + * @param nodeMemory NODE_LOCAL request memory + */ + private void updateRelaxLocalityRequestSchedule(RMNode node, int nodeMemory) { // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be // satisfied. This forces the RR that we consider for preemption to be the // NODE_LOCAL one. ResourceRequest nodeRequest = - createResourceRequest(GB, node1.getHostName(), 1, 4, true); + createResourceRequest(nodeMemory, node.getHostName(), 1, 4, true); ResourceRequest rackRequest = - createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); + createResourceRequest(GB * 10, node.getRackName(), 1, 1, true); ResourceRequest anyRequest = - createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); List resourceRequests = - Arrays.asList(nodeRequest, rackRequest, anyRequest); + Arrays.asList(nodeRequest, rackRequest, anyRequest); ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( - "root.preemptable.child-2", "default", resourceRequests); + "root.preemptable.child-2", "default", resourceRequests); starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); // Move clock enough to identify starvation clock.tickSec(1); scheduler.update(); + } + /** + * Verify the preemption result for relax locality + * @param nodeId which not be preempted + * @param greedyAttemptId attempt id of the greedy app + * @throws Exception + */ + private void verifyRelaxLocalityPreemption( + NodeId nodeId, ApplicationAttemptId greedyAttemptId) throws Exception { // Make sure 4 containers were preempted from the greedy app, but also that // none were preempted on our all-AM node, even though the NODE_LOCAL RR // asked for resources on it. @@ -451,12 +473,37 @@ public void testRelaxLocalityToNotPreemptAM() throws Exception { // It should be possible to modify the RRs such that this is true // after YARN-7903. verifyPreemption(0, 4); + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); for (RMContainer container : node.getCopiedListOfRunningContainers()) { - assert (container.isAMContainer()); - assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); + assert(container.isAMContainer()); + assert(container.getApplicationAttemptId().equals(greedyAttemptId)); } } + @Test + public void testRelaxLocalityToNotPreemptAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setAllAMContainersOnNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node1.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB); + verifyRelaxLocalityPreemption(node1.getNodeID(), greedyAppAttemptId); + } + + @Test + public void testRelaxLocalityToPreemptBestAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setNumAMContainersOnNode(3, node1.getNodeID()); + RMNode node2 = rmNodes.get(1); + setAllAMContainersOnNode(node2.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node1.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB * 2); + verifyRelaxLocalityPreemption(node2.getNodeID(), greedyAppAttemptId); + } + @Test public void testAppNotPreemptedBelowFairShare() throws Exception { takeAllResources("root.preemptable.child-1");