diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a8a47c9..fe48d16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -187,7 +189,10 @@ public Configuration getConf() { new ConcurrentHashMap(); private boolean initialized = false; - + + private boolean scheduleAsynchronously; + private AsyncScheduleThread asyncSchedulerThread; + private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -267,6 +272,13 @@ public Resource getClusterResources() { initializeQueues(this.conf); + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this.rmContext, this); + asyncSchedulerThread.start(); + } + LOG.info("scheduleAsynchronously : " + scheduleAsynchronously); + initialized = true; LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + @@ -287,6 +299,47 @@ public Resource getClusterResources() { } } + static class AsyncScheduleThread extends Thread { + + private final RMContext rmContext; + private final CapacityScheduler cs; + private AtomicBoolean runSchedules = new AtomicBoolean(false); + + public AsyncScheduleThread(RMContext rmContext, CapacityScheduler cs) { + this.cs = cs; + this.rmContext = rmContext; + setDaemon(true); + } + + @Override + public void run() { + while (true) { + if (!runSchedules.get()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) {} + } else { + ConcurrentMap nodes = rmContext.getRMNodes(); + for (RMNode node : nodes.values()) { + cs.schedule(node); + } + try { + Thread.sleep(5); + } catch (InterruptedException e) {} + } + } + } + + public void beginSchedule() { + runSchedules.set(true); + } + + public void suspendSchedule() { + runSchedules.set(false); + } + + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; @@ -655,10 +708,27 @@ private synchronized void nodeUpdate(RMNode nm) { + " availableResource: " + node.getAvailableResource()); } + } + + private int scheduleCallsCounter = 0; + private synchronized void schedule(RMNode nm) { + + if ((++scheduleCallsCounter % 100000) == 0) { + LOG.info("schedule: " + scheduleCallsCounter + " invocations"); + } + // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations - + FiCaSchedulerNode node = getNode(nm.getNodeID()); + + // TODO: Better Check to see if node has successfully registered or not... + if (node == null) { + LOG.info("Node " + nm.getNodeID() + + " hasn't successfully registered with CS yet."); + return; + } + RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { FiCaSchedulerApp reservedApplication = @@ -687,7 +757,14 @@ private synchronized void nodeUpdate(RMNode nm) { // Try to schedule more if there are no reservations to fulfill if (node.getReservedContainer() == null) { - root.assignContainers(clusterResource, node); + if (Resources.greaterThanOrEqual(calculator, getClusterResources(), + node.getAvailableResource(), minimumAllocation)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to schedule on node: " + node.getNodeName() + + ", available: " + node.getAvailableResource()); + } + root.assignContainers(clusterResource, node); + } } else { LOG.info("Skipping scheduling since node " + nm + " is reserved by application " + @@ -731,7 +808,11 @@ public void handle(SchedulerEvent event) { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode()); + RMNode nm = nodeUpdatedEvent.getRMNode(); + nodeUpdate(nm); + if (!scheduleAsynchronously) { + schedule(nm); + } } break; case APP_ADDED: @@ -773,6 +854,10 @@ private synchronized void addNode(RMNode nodeManager) { ++numNodeManagers; LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); + + if (scheduleAsynchronously && numNodeManagers == 1) { + asyncSchedulerThread.beginSchedule(); + } } private synchronized void removeNode(RMNode nodeInfo) { @@ -783,6 +868,11 @@ private synchronized void removeNode(RMNode nodeInfo) { Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); root.updateClusterResource(clusterResource); --numNodeManagers; + + if (scheduleAsynchronously && numNodeManagers == 0) { + asyncSchedulerThread.suspendSchedule(); + } + // Remove running containers List runningContainers = node.getRunningContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fceabf..d3e1289 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -133,6 +133,10 @@ PREFIX + "node-locality-delay"; @Private + public static final String SCHEDULE_ASYNCHRONOUSLY = + PREFIX + "schedule-asynchronously"; + + @Private public static final int DEFAULT_NODE_LOCALITY_DELAY = -1; public CapacitySchedulerConfiguration() { @@ -350,4 +354,12 @@ public void setResourceComparator( resourceCalculatorClass, ResourceCalculator.class); } + + public boolean getScheduleAynschronously() { + return getBoolean(SCHEDULE_ASYNCHRONOUSLY, true); + } + + public void setScheduleAynschronously(boolean async) { + setBoolean(SCHEDULE_ASYNCHRONOUSLY, async); + } }