diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 18ccf9d..fca2b12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -970,37 +970,27 @@ private synchronized void nodeUpdate(RMNode nm) { } } - private void continuousScheduling() { - while (true) { - List nodeIdList = new ArrayList(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } + protected void continuousSchedulingAttempt() { + List nodeIdList = new ArrayList(nodes.keySet()); + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = getFSSchedulerNode(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); + if (node != null && Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } } } @@ -1010,6 +1000,12 @@ private void continuousScheduling() { @Override public int compare(NodeId n1, NodeId n2) { + if (!nodes.containsKey(n1)) { + return 1; + } + if (!nodes.containsKey(n2)) { + return -1; + } return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); @@ -1234,7 +1230,16 @@ private synchronized void initScheduler(Configuration conf) new Runnable() { @Override public void run() { - continuousScheduling(); + while (true) { + continuousSchedulingAttempt(); + + try { + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Error while doing sleep in continuous scheduling: " + + e.toString(), e); + } + } } } ); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ed492ce..cd8bdaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2763,7 +2763,47 @@ public void testContinuousScheduling() throws Exception { Assert.assertEquals(2, nodes.size()); } - + @Test + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling once manually + FairScheduler fs = new FairScheduler(); + Configuration conf = createConfiguration(); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + false); + fs.setRMContext(resourceManager.getRMContext()); + fs.init(conf); + fs.start(); + fs.reinitialize(conf, resourceManager.getRMContext()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + fs.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + fs.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, fs.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + fs.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, fs.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + fs.continuousSchedulingAttempt(); + } catch (NullPointerException e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);