diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de56c02ae3a2f6ac3851eafa26f7fbd50036..07b32714a2a219e43a82b66f357b5cdc9bb7d14c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1039,13 +1039,23 @@ public int compare(NodeId n1, NodeId n2) { nodes.get(n1).getAvailableResource()); } } - - private synchronized void attemptScheduling(FSSchedulerNode node) { + + @VisibleForTesting + synchronized void attemptScheduling(FSSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } + final NodeId nodeID = node.getNodeID(); + if (!nodes.containsKey(nodeID)) { + // The node might have just been removed while this thread was waiting + // on the synchronized lock before it entered this synchronized method + LOG.info("Skipping scheduling as the node " + nodeID + + " has been removed"); + return; + } + // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c2edb51a47c9f9f1dc8da903472e6d2379..94fdc1a73686e58ac0bacedeb313f73fa51dee1e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3890,6 +3890,50 @@ public void testContinuousSchedulingWithNodeRemoved() throws Exception { } @Test + public void testSchedulingOnRemovedNode() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + ApplicationAttemptId id11 = createAppAttemptId(1, 1); + createMockRMApp(id11); + + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", + false); + scheduler.addApplicationAttempt(id11, false, false); + + List ask1 = new ArrayList<>(); + ResourceRequest request1 = + createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true); + + ask1.add(request1); + scheduler.allocate(id11, ask1, new ArrayList(), null, + null); + + String hostName = "127.0.0.1"; + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * 1024, 8), 1, hostName); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + FSSchedulerNode node = (FSSchedulerNode)scheduler.getSchedulerNode( + node1.getNodeID()); + + NodeRemovedSchedulerEvent removeNode1 = + new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + + scheduler.attemptScheduling(node); + + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(id11, + RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent1); + } + + @Test public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() throws IOException { // This test verifies if default rule in queue placement policy