diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6c392b5..db4c859 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,7 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -194,6 +196,9 @@ public Configuration getConf() { private ResourceCalculator calculator; private boolean usePortForNodeName; + private boolean scheduleAsynchronously; + private AsyncScheduleThread asyncSchedulerThread; + public CapacityScheduler() {} @Override @@ -272,11 +277,19 @@ public Resource getClusterResources() { initializeQueues(this.conf); + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this, nodes); + asyncSchedulerThread.start(); + } + initialized = true; LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">"); + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + + "asynchronousScheduling=" + scheduleAsynchronously); + } else { CapacitySchedulerConfiguration oldConf = this.conf; this.conf = loadCapacitySchedulerConfiguration(configuration); @@ -291,6 +304,57 @@ public Resource getClusterResources() { } } + static class AsyncScheduleThread extends Thread { + + private final Random rand = new Random(System.currentTimeMillis()); + private final CapacityScheduler cs; + private final Map nodes; + private AtomicBoolean runSchedules = new AtomicBoolean(false); + + public AsyncScheduleThread( + CapacityScheduler cs, Map nodes) { + this.cs = cs; + this.nodes = nodes; + setDaemon(true); + } + + @Override + public void run() { + while (true) { + if (!runSchedules.get()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } else { + // First randomize the start point + int current = 0; + int start = rand.nextInt(nodes.size()); + for (FiCaSchedulerNode node : nodes.values()) { + if (current++ >= start) { + cs.schedule(node); + } + } + // Now, just get everyone to be safe + for (FiCaSchedulerNode node : nodes.values()) { + cs.schedule(node); + } + try { + Thread.sleep(5); + } catch (InterruptedException e) {} + } + } + } + + public void beginSchedule() { + runSchedules.set(true); + } + + public void suspendSchedule() { + runSchedules.set(false); + } + + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; @@ -696,6 +760,9 @@ private synchronized void nodeUpdate(RMNode nm) { LOG.debug("Node being looked for scheduling " + nm + " availableResource: " + node.getAvailableResource()); } + } + + private synchronized void schedule(FiCaSchedulerNode node) { // Assign new containers... // 1. Check for reserved applications @@ -708,7 +775,8 @@ private synchronized void nodeUpdate(RMNode nm) { // Try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + nm); + reservedApplication.getApplicationId() + " on node: " + + node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); CSAssignment assignment = queue.assignContainers(clusterResource, node); @@ -729,9 +797,16 @@ private synchronized void nodeUpdate(RMNode nm) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - root.assignContainers(clusterResource, node); + if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + node.getAvailableResource(), minimumAllocation)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to schedule on node: " + node.getNodeName() + + ", available: " + node.getAvailableResource()); + } + root.assignContainers(clusterResource, node); + } } else { - LOG.info("Skipping scheduling since node " + nm + + LOG.info("Skipping scheduling since node " + node.getNodeID() + " is reserved by application " + node.getReservedContainer().getContainerId().getApplicationAttemptId() ); @@ -772,7 +847,11 @@ public void handle(SchedulerEvent event) { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode()); + RMNode node = nodeUpdatedEvent.getRMNode(); + nodeUpdate(node); + if (!scheduleAsynchronously) { + schedule(getNode(node.getNodeID())); + } } break; case APP_ADDED: @@ -831,6 +910,10 @@ private synchronized void addNode(RMNode nodeManager) { ++numNodeManagers; LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); + + if (scheduleAsynchronously && numNodeManagers == 1) { + asyncSchedulerThread.beginSchedule(); + } } private synchronized void removeNode(RMNode nodeInfo) { @@ -842,6 +925,10 @@ private synchronized void removeNode(RMNode nodeInfo) { root.updateClusterResource(clusterResource); --numNodeManagers; + if (scheduleAsynchronously && numNodeManagers == 0) { + asyncSchedulerThread.suspendSchedule(); + } + // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 267f819..5037962 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -135,6 +135,10 @@ @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY = + PREFIX + "schedule-asynchronously"; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -357,4 +361,13 @@ public void setResourceComparator( resourceCalculatorClass, ResourceCalculator.class); } + + public boolean getScheduleAynschronously() { + return getBoolean(SCHEDULE_ASYNCHRONOUSLY, true); + } + + public void setScheduleAynschronously(boolean async) { + setBoolean(SCHEDULE_ASYNCHRONOUSLY, async); + } + }