diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de56c02ae3a2f6ac3851eafa26f7fbd50036..b38118b4a8c90c370b28f719c02924112e0676a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1040,12 +1040,21 @@ public int compare(NodeId n1, NodeId n2) { } } - private synchronized void attemptScheduling(FSSchedulerNode node) { + protected synchronized void attemptScheduling(FSSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } + final NodeId nodeID = node.getNodeID(); + if (!nodes.containsKey(nodeID)) { + // The node might have just been removed while this thread was waiting + // on the synchronized lock before it entered this synchronized method + LOG.info("Skipping scheduling as the node " + nodeID + + " has been removed"); + return; + } + // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c2edb51a47c9f9f1dc8da903472e6d2379..74fa57c54c8bef3b5543e81fe33f9ba8535b8b69 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -43,7 +44,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.xml.parsers.ParserConfigurationException; @@ -707,9 +713,10 @@ public void testSimpleContainerAllocation() throws IOException { scheduler.handle(updateEvent); // Asked for less than increment allocation. - assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + assertEquals(FairSchedulerConfiguration + .DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); @@ -761,7 +768,7 @@ public void testSimpleContainerReservation() throws Exception { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -3890,6 +3897,50 @@ public void testContinuousSchedulingWithNodeRemoved() throws Exception { } @Test + public void testSchedulingOnRemovedNode() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + ApplicationAttemptId id11 = createAppAttemptId(1, 1); + createMockRMApp(id11); + + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", + false); + scheduler.addApplicationAttempt(id11, false, false); + + List ask1 = new ArrayList<>(); + ResourceRequest request1 = + createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true); + + ask1.add(request1); + scheduler.allocate(id11, ask1, new ArrayList(), null, + null); + + String hostName = "127.0.0.1"; + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * 1024, 8), 1, hostName); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + FSSchedulerNode node = (FSSchedulerNode)scheduler.getSchedulerNode( + node1.getNodeID()); + + NodeRemovedSchedulerEvent removeNode1 = + new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + + scheduler.attemptScheduling(node); + + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(id11, + RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent1); + } + + @Test public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() throws IOException { // This test verifies if default rule in queue placement policy