diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0fac0b9..0fa1a9e 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -172,6 +172,12 @@
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6c392b5..7207d32 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,11 +21,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -194,6 +197,9 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;
+ private boolean scheduleAsynchronously;
+ private AsyncScheduleThread asyncSchedulerThread;
+
public CapacityScheduler() {}
@Override
@@ -272,11 +278,19 @@ public Resource getClusterResources() {
initializeQueues(this.conf);
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ asyncSchedulerThread.start();
+ }
+
initialized = true;
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">");
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously);
+
} else {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = loadCapacitySchedulerConfiguration(configuration);
@@ -291,6 +305,65 @@ public Resource getClusterResources() {
}
}
+ private final static Random random = new Random(System.currentTimeMillis());
+
+ /**
+ * Schedule on all nodes by starting at a random point.
+ * @param cs
+ * @param nodes
+ */
+ static void schedule(CapacityScheduler cs) {
+ // First randomize the start point
+ int current = 0;
+ Collection nodes = cs.getAllNodes().values();
+ int start = random.nextInt(nodes.size());
+ for (FiCaSchedulerNode node : nodes) {
+ if (current++ >= start) {
+ cs.schedule(node);
+ }
+ }
+ // Now, just get everyone to be safe
+ for (FiCaSchedulerNode node : nodes) {
+ cs.schedule(node);
+ }
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {}
+ }
+
+ static class AsyncScheduleThread extends Thread {
+
+ private final CapacityScheduler cs;
+ private AtomicBoolean runSchedules = new AtomicBoolean(false);
+
+ public AsyncScheduleThread(CapacityScheduler cs) {
+ this.cs = cs;
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (!runSchedules.get()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ } else {
+ schedule(cs);
+ }
+ }
+ }
+
+ public void beginSchedule() {
+ runSchedules.set(true);
+ }
+
+ public void suspendSchedule() {
+ runSchedules.set(false);
+ }
+
+ }
+
@Private
public static final String ROOT_QUEUE =
CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT;
@@ -696,6 +769,9 @@ private synchronized void nodeUpdate(RMNode nm) {
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
+ }
+
+ private synchronized void schedule(FiCaSchedulerNode node) {
// Assign new containers...
// 1. Check for reserved applications
@@ -708,7 +784,8 @@ private synchronized void nodeUpdate(RMNode nm) {
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
- reservedApplication.getApplicationId() + " on node: " + nm);
+ reservedApplication.getApplicationId() + " on node: " +
+ node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node);
@@ -729,9 +806,16 @@ private synchronized void nodeUpdate(RMNode nm) {
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
- root.assignContainers(clusterResource, node);
+ if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+ node.getAvailableResource(), minimumAllocation)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to schedule on node: " + node.getNodeName() +
+ ", available: " + node.getAvailableResource());
+ }
+ root.assignContainers(clusterResource, node);
+ }
} else {
- LOG.info("Skipping scheduling since node " + nm +
+ LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
@@ -772,7 +856,11 @@ public void handle(SchedulerEvent event) {
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
- nodeUpdate(nodeUpdatedEvent.getRMNode());
+ RMNode node = nodeUpdatedEvent.getRMNode();
+ nodeUpdate(node);
+ if (!scheduleAsynchronously) {
+ schedule(getNode(node.getNodeID()));
+ }
}
break;
case APP_ADDED:
@@ -831,6 +919,10 @@ private synchronized void addNode(RMNode nodeManager) {
++numNodeManagers;
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
+
+ if (scheduleAsynchronously && numNodeManagers == 1) {
+ asyncSchedulerThread.beginSchedule();
+ }
}
private synchronized void removeNode(RMNode nodeInfo) {
@@ -842,6 +934,10 @@ private synchronized void removeNode(RMNode nodeInfo) {
root.updateClusterResource(clusterResource);
--numNodeManagers;
+ if (scheduleAsynchronously && numNodeManagers == 0) {
+ asyncSchedulerThread.suspendSchedule();
+ }
+
// Remove running containers
List runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
@@ -930,7 +1026,12 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
+
+ @Lock(Lock.NoLock.class)
+ Map getAllNodes() {
+ return nodes;
+ }
+
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 267f819..0841823 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -135,6 +135,13 @@
@Private
public static final int DEFAULT_NODE_LOCALITY_DELAY = -1;
+ @Private
+ public static final String SCHEDULE_ASYNCHRONOUSLY =
+ PREFIX + "schedule-asynchronously";
+
+ @Private
+ public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY = false;
+
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@@ -357,4 +364,13 @@ public void setResourceComparator(
resourceCalculatorClass,
ResourceCalculator.class);
}
+
+ public boolean getScheduleAynschronously() {
+ return getBoolean(SCHEDULE_ASYNCHRONOUSLY, DEFAULT_SCHEDULE_ASYNCHRONOUSLY);
+ }
+
+ public void setScheduleAynschronously(boolean async) {
+ setBoolean(SCHEDULE_ASYNCHRONOUSLY, async);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 47ec546..a48adea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -19,20 +19,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@@ -61,11 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -650,4 +642,30 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {
cs.getSchedulerApplications(), cs, "a1");
Assert.assertEquals("a1", app.getQueue().getQueueName());
}
- }
+
+ @Test
+ public void testAsyncScheduling() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ final int NODES = 100;
+
+ // Register nodes
+ for (int i=0; i < NODES; ++i) {
+ String host = "192.168.1." + i;
+ RMNode node =
+ MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+ cs.handle(new NodeAddedSchedulerEvent(node));
+ }
+
+ // Now directly exercise the scheduling loop
+ for (int i=0; i < NODES; ++i) {
+ CapacityScheduler.schedule(cs);
+ }
+ }
+
+}