diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a3d5736..18af1fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -181,6 +181,7 @@ protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling + private Comparator nodeAvailableResourceComparator; // Node available resource comparator protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality protected long nodeLocalityDelayMs; // Delay for node locality @@ -946,14 +947,23 @@ private synchronized void nodeUpdate(RMNode nm) { private void continuousScheduling() { while (true) { - for (FSSchedulerNode node : nodes.values()) { - try { - if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { - attemptScheduling(node); + // sort nodes according to available resource + List nodeIdList = new ArrayList(nodes.keySet()); + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + if (nodes.containsKey(nodeId)) { + FSSchedulerNode node = nodes.get(nodeId); + try { + if (Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + ": " + - ex.toString(), ex); } } try { @@ -964,6 +974,22 @@ private void continuousScheduling() { } } } + + /** Sorting nodes by available resource */ + private class NodeAvailableResourceComparator implements Comparator { + + @Override + public int compare(NodeId n1, NodeId n2) { + return RESOURCE_CALCULATOR.compare(clusterCapacity, + nodes.get(n2).getAvailableResource(), + nodes.get(n1).getAvailableResource()); + } + + @Override + public boolean equals(Object o) { + return false; + } + } private synchronized void attemptScheduling(FSSchedulerNode node) { // Assign new containers... @@ -1153,6 +1179,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) updateThread.start(); if (continuousSchedulingEnabled) { + nodeAvailableResourceComparator = new NodeAvailableResourceComparator(); // start continuous scheduling thread Thread schedulingThread = new Thread( new Runnable() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e0b81dc..3cc3b67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -37,6 +37,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -60,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -2335,7 +2339,7 @@ public void testConcurrentAccessOnApplications() throws Exception { fs.applications, FSSchedulerApp.class); } - @Test (timeout = 5000) + @Test (timeout = 10000) public void testContinuousScheduling() throws Exception { // set continuous scheduling enabled FairScheduler fs = new FairScheduler(); @@ -2346,16 +2350,21 @@ public void testContinuousScheduling() throws Exception { Assert.assertTrue("Continuous scheduling should be enabled.", fs.isContinuousSchedulingEnabled()); - // Add one node + // Add two nodes RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); fs.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + fs.handle(nodeEvent2); // available resource - Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024); - Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8); + Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024); + Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16); // send application request ApplicationAttemptId appAttemptId = @@ -2374,12 +2383,35 @@ public void testContinuousScheduling() throws Exception { FSSchedulerApp app = fs.applications.get(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } - + // check consumption Assert.assertEquals(1024, app.getCurrentConsumption().getMemory()); Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores()); - } + // another request + request = + createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); + ask.clear(); + ask.add(request); + fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + + Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); + + // Wait until app gets resources. + while (app.getCurrentConsumption() + .equals(Resources.createResource(1024, 1))) { } + + Assert.assertEquals(2048, app.getCurrentConsumption().getMemory()); + Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores()); + + // 2 containers should be assigned to 2 nodes + Set nodes = new HashSet(); + Iterator it = app.getLiveContainers().iterator(); + while (it.hasNext()) { + nodes.add(it.next().getContainer().getNodeId()); + } + Assert.assertEquals(2, nodes.size()); + } @Test public void testDontAllowUndeclaredPools() throws Exception{