diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a847ce..40d1033 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1021,6 +1021,38 @@ private void continuousScheduling() { } } + protected void oneTimeContinuousScheduling(List nodeIdList) { + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } + + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + if (nodes.containsKey(nodeId)) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); + try { + if (Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); + } + } + } + try { + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Error while doing sleep in continuous scheduling: " + + e.toString(), e); + } + } + /** Sort nodes by available resource */ private class NodeAvailableResourceComparator implements Comparator { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ed492ce..32c4833 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2763,7 +2763,54 @@ public void testContinuousScheduling() throws Exception { Assert.assertEquals(2, nodes.size()); } - + @Test (timeout = 10000) + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // disabled continuous scheduling, will can continuous scheduling once manually + FairScheduler fs = new FairScheduler(); + Configuration conf = createConfiguration(); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + false); + fs.setRMContext(resourceManager.getRMContext()); + fs.init(conf); + fs.start(); + fs.reinitialize(conf, resourceManager.getRMContext()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + fs.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + fs.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, fs.getNumClusterNodes()); + + List nodeIdList = new ArrayList(); + nodeIdList.add(node1.getNodeID()); + nodeIdList.add(node2.getNodeID()); + Assert.assertEquals("We should have two nodes to be sorted.", + 2, nodeIdList.size()); + + // Remove the node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + fs.handle(removeNode1); + fs.update(); + Assert.assertEquals("We should only have one alive node.", + 1, fs.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try{ + fs.oneTimeContinuousScheduling(nodeIdList); + fail("Exception is expected because one node is removed."); + } catch (NullPointerException e) { + // Exception is expected. + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);