diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 47e580d8d14..1886f847dee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -110,27 +110,21 @@ public void run() { // Iterate through enough RRs to address app's starvation for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { + List potentialNodes = scheduler.getNodeTracker() + .getNodesByResourceName(rr.getResourceName()); for (int i = 0; i < rr.getNumContainers(); i++) { - PreemptableContainers bestContainers = null; - List potentialNodes = scheduler.getNodeTracker() - .getNodesByResourceName(rr.getResourceName()); - int maxAMContainers = Integer.MAX_VALUE; - - for (FSSchedulerNode node : potentialNodes) { - PreemptableContainers preemptableContainers = - identifyContainersToPreemptOnNode( - rr.getCapability(), node, maxAMContainers); - - if (preemptableContainers != null) { - // This set is better than any previously identified set. - bestContainers = preemptableContainers; - maxAMContainers = bestContainers.numAMContainers; - - if (maxAMContainers == 0) { - break; - } - } - } // End of iteration through nodes for one RR + PreemptableContainers bestContainers = + identifyContainersToPreemptForOneContainer(potentialNodes, rr); + + // Don't preempt AM containers just to satisfy local requests if relax + // locality is enabled. + if (bestContainers != null + && bestContainers.numAMContainers > 0 + && !ResourceRequest.isAnyLocation(rr.getResourceName()) + && rr.getRelaxLocality()) { + bestContainers = identifyContainersToPreemptForOneContainer( + scheduler.getNodeTracker().getAllNodes(), rr); + } if (bestContainers != null) { List containers = bestContainers.getAllContainers(); @@ -153,6 +147,29 @@ public void run() { return containersToPreempt; } + private PreemptableContainers identifyContainersToPreemptForOneContainer( + List potentialNodes, ResourceRequest rr) { + PreemptableContainers bestContainers = null; + int maxAMContainers = Integer.MAX_VALUE; + + for (FSSchedulerNode node : potentialNodes) { + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode( + rr.getCapability(), node, maxAMContainers); + + if (preemptableContainers != null) { + // This set is better than any previously identified set. + bestContainers = preemptableContainers; + maxAMContainers = bestContainers.numAMContainers; + + if (maxAMContainers == 0) { + break; + } + } + } + return bestContainers; + } + /** * Identify containers to preempt on a given node. Try to find a list with * least AM containers to avoid preempting AM containers. This method returns diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index ac5d9fe7afa..e6e8898e5fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -384,6 +387,14 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { } } + private void setAllAMContainersOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + ((RMContainerImpl) container).setAMContainer(true); + } + } + + @Test public void testPreemptionSelectNonAMContainer() throws Exception { takeAllResources("root.preemptable.child-1"); @@ -403,6 +414,45 @@ public void testPreemptionSelectNonAMContainer() throws Exception { } @Test + public void testRelaxLocalityToNotPreemptAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setAllAMContainersOnNode(node1.getNodeID()); + SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + node.getCopiedListOfRunningContainers().get(0).getApplicationAttemptId(); + + // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be + // satisfied. This forces the RR that we consider for preemption to be the + // NODE_LOCAL one. + ResourceRequest nodeRequest = createResourceRequest(GB, node1.getHostName(), 1, 4, true); + ResourceRequest rackRequest = createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); + ResourceRequest anyRequest = createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + + List resourceRequests = + new ArrayList<>(Arrays.asList(nodeRequest, rackRequest, anyRequest)); + + ApplicationAttemptId starvedAppAttemptId = + createSchedulingRequest("root.preemptable.child-2", "default", resourceRequests); + starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + + // Make sure 4 containers were preempted from the greedy app, but also that + // none were preempted on our all-AM node, even though the NODE_LOCAL RR + // asked for resources on it. + + // TODO The starved app should be allocated 4 containers. + verifyPreemption(0, 4); + for (RMContainer container : node.getCopiedListOfRunningContainers()) { + assert (container.isAMContainer()); + assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); + } + } + + @Test public void testAppNotPreemptedBelowFairShare() throws Exception { takeAllResources("root.preemptable.child-1"); tryPreemptMoreThanFairShare("root.preemptable.child-2");