diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 14ec99c..3c62cce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -316,10 +316,18 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + localRequest); } - NodeType allowedLocality = app.getAllowedLocalityLevel(priority, - scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(), - scheduler.getRackLocalityThreshold()); - + NodeType allowedLocality; + if (scheduler.isContinuousSchedulingEnabled()) { + allowedLocality = app.getAllowedLocalityLevelByTime(priority, + scheduler.getNodeLocalityThresholdTimeMs(), + scheduler.getRackLocalityThresholdTimeMs()); + } else { + allowedLocality = app.getAllowedLocalityLevel(priority, + scheduler.getNumClusterNodes(), + scheduler.getNodeLocalityThreshold(), + scheduler.getRackLocalityThreshold()); + } + if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { return assignContainer(node, priority, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 670e961..9390194 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -513,6 +513,54 @@ else if (allowed.equals(NodeType.RACK_LOCAL)) { return allowedLocalityLevel.get(priority); } + /** + * Return the level at which we are allowed to schedule containers. + * Given the thresholds indicating how much time passed before relaxing + * scheduling constraints. + */ + public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, + long nodeLocalityThresholdTimeMs, long rackLocalityThresholdTimeMs) { + + // if not being used, can schedule anywhere + if (nodeLocalityThresholdTimeMs < 0 || rackLocalityThresholdTimeMs < 0) { + return NodeType.OFF_SWITCH; + } + + // default level is NODE_LOCAL + if (! allowedLocalityLevel.containsKey(priority)) { + allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } + + NodeType allowed = allowedLocalityLevel.get(priority); + + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } + + // check waiting time + long waitTime = System.currentTimeMillis(); + if (lastScheduledContainer.containsKey(priority)) { + waitTime -= lastScheduledContainer.get(priority); + } else { + waitTime -= appSchedulable.getStartTime(); + } + + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityThresholdTimeMs : rackLocalityThresholdTimeMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(priority); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(priority); + } + } + return allowedLocalityLevel.get(priority); + } synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 16e7fd6..b874993 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -179,8 +179,12 @@ protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster + protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not + protected int continuousSchedulingSleepTimeMs; // Sleep time for each pass in continuous scheduling protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality + protected long nodeLocalityThresholdTimeMs; // Cluster waiting time threshold for node locality + protected long rackLocalityThresholdTimeMs; // Cluster waiting time threshold for rack locality private FairSchedulerEventLog eventLog; // Machine-readable event log protected boolean assignMultiple; // Allocate multiple containers per // heartbeat @@ -582,6 +586,22 @@ public double getRackLocalityThreshold() { return rackLocalityThreshold; } + public long getNodeLocalityThresholdTimeMs() { + return nodeLocalityThresholdTimeMs; + } + + public long getRackLocalityThresholdTimeMs() { + return rackLocalityThresholdTimeMs; + } + + public boolean isContinuousSchedulingEnabled() { + return continuousSchedulingEnabled; + } + + public synchronized int getContinuousSchedulingSleepTimeMs() { + return continuousSchedulingSleepTimeMs; + } + public Resource getClusterCapacity() { return clusterCapacity; } @@ -907,6 +927,37 @@ private synchronized void nodeUpdate(RMNode nm) { completedContainer, RMContainerEventType.FINISHED); } + if (continuousSchedulingEnabled) { + if (!completedContainers.isEmpty()) { + attemptScheduling(node); + } + } else { + attemptScheduling(node); + } + } + + private void continuousScheduling() { + while (true) { + for (FSSchedulerNode node : nodes.values()) { + try { + if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + ": " + + ex.toString(), ex); + } + } + try { + Thread.sleep(getContinuousSchedulingSleepTimeMs()); + } catch (InterruptedException e) { + LOG.warn("Error while doing sleep in continuous scheduling: " + + e.toString(), e); + } + } + } + + private synchronized void attemptScheduling(FSSchedulerNode node) { // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations @@ -914,19 +965,18 @@ private synchronized void nodeUpdate(RMNode nm) { AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - if (reservedAppSchedulable != null && - !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node " + nm); + + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; } else { // Reservation exists; try to fulfill the reservation LOG.info("Trying to fulfill reservation for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node: " + nm); + + " on node: " + node); node.getReservedAppSchedulable().assignReservedContainer(node); } @@ -1060,8 +1110,13 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) maximumAllocation = this.conf.getMaximumAllocation(); incrAllocation = this.conf.getIncrementAllocation(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepTimeMs = + this.conf.getContinuousSchedulingSleepTimeMs(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityThresholdTimeMs = this.conf.getLocalityThresholdNodeTimeMs(); + rackLocalityThresholdTimeMs = this.conf.getLocalityThresholdRackTimeMs(); preemptionEnabled = this.conf.getPreemptionEnabled(); assignMultiple = this.conf.getAssignMultiple(); maxAssign = this.conf.getMaxAssign(); @@ -1088,6 +1143,21 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); updateThread.start(); + + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + Thread schedulingThread = new Thread( + new Runnable() { + @Override + public void run() { + continuousScheduling(); + } + } + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + schedulingThread.start(); + } } else { try { queueMgr.reloadAllocs(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index acdd40e..0c3ac7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -66,6 +66,22 @@ protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK = DEFAULT_LOCALITY_THRESHOLD; + /** Time threshold for node locality. */ + protected static final String LOCALITY_THRESHOLD_NODE_TIME_MS = CONF_PREFIX + "locality.threshold.node.time.ms"; + protected static final long DEFAULT_LOCALITY_THRESHOLD_NODE_TIME_MS = -1L; + + /** Time threshold for rack locality. */ + protected static final String LOCALITY_THRESHOLD_RACK_TIME_MS = CONF_PREFIX + "locality.threshold.rack.time.ms"; + protected static final long DEFAULT_LOCALITY_THRESHOLD_RACK_TIME_MS = -1L; + + /** Enable continuous scheduling or not. */ + protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX + "continuous.scheduling.enabled"; + protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false; + + /** Sleep time of each pass in continuous scheduling (5ms in default) */ + protected static final String CONTINUOUS_SCHEDULING_SLEEP_TIME_MS = CONF_PREFIX + "continuous.scheduling.sleep.time.ms"; + protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_TIME_MS = 5; + /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; @@ -134,6 +150,22 @@ public float getLocalityThresholdRack() { return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK); } + public boolean isContinuousSchedulingEnabled() { + return getBoolean(CONTINUOUS_SCHEDULING_ENABLED, DEFAULT_CONTINUOUS_SCHEDULING_ENABLED); + } + + public int getContinuousSchedulingSleepTimeMs() { + return getInt(CONTINUOUS_SCHEDULING_SLEEP_TIME_MS, DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_TIME_MS); + } + + public long getLocalityThresholdNodeTimeMs() { + return getLong(LOCALITY_THRESHOLD_NODE_TIME_MS, DEFAULT_LOCALITY_THRESHOLD_NODE_TIME_MS); + } + + public long getLocalityThresholdRackTimeMs() { + return getLong(LOCALITY_THRESHOLD_RACK_TIME_MS, DEFAULT_LOCALITY_THRESHOLD_RACK_TIME_MS); + } + public boolean getPreemptionEnabled() { return getBoolean(PREEMPTION, DEFAULT_PREEMPTION); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java index 8a53bd0..86a368d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java @@ -94,6 +94,61 @@ public void testDelayScheduling() { } @Test + public void testDelaySchedulingForContinuousScheduling() + throws InterruptedException { + Queue queue = Mockito.mock(Queue.class); + Priority prio = Mockito.mock(Priority.class); + Mockito.when(prio.getPriority()).thenReturn(1); + + long nodeLocalityThresholdTimeMs = 5 * 1000L; // 5 seconds + long rackLocalityThresholdTimeMs = 6 * 1000L; // 6 seconds + + ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1); + FSSchedulerApp schedulerApp = + new FSSchedulerApp(applicationAttemptId, "user1", queue, + null, null); + AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class); + Mockito.when(appSchedulable.getStartTime()) + .thenReturn(System.currentTimeMillis()); + schedulerApp.setAppSchedulable(appSchedulable); + + // Default level should be node-local + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + + // after 4 seconds should remain node local + Thread.sleep(4000); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + + // after 6 seconds should switch to rack local + Thread.sleep(2000); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + + // manually set back to node local + schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL); + schedulerApp.resetSchedulingOpportunities(prio); + assertEquals(NodeType.NODE_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + + // Now escalate again to rack-local, then to off-switch + Thread.sleep(5500); + assertEquals(NodeType.RACK_LOCAL, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + + Thread.sleep(6500); + assertEquals(NodeType.OFF_SWITCH, + schedulerApp.getAllowedLocalityLevelByTime(prio, + nodeLocalityThresholdTimeMs, rackLocalityThresholdTimeMs)); + } + + @Test /** * Ensure that when negative paramaters are given (signaling delay scheduling * no tin use), the least restrictive locality level is returned. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5bede95..6305d5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -49,15 +49,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.MockApps; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -298,6 +290,14 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + true); + conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_TIME_MS, + 10); + conf.setInt(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK_TIME_MS, + 5000); + conf.setInt(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE_TIME_MS, + 5000); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, @@ -308,6 +308,11 @@ public void testLoadConfigurationOnInitialize() throws IOException { Assert.assertEquals(true, scheduler.sizeBasedWeight); Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01); Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); + Assert.assertTrue("The continuous scheduling should be enabled", + scheduler.continuousSchedulingEnabled); + Assert.assertEquals(10, scheduler.continuousSchedulingSleepTimeMs); + Assert.assertEquals(5000, scheduler.nodeLocalityThresholdTimeMs); + Assert.assertEquals(5000, scheduler.rackLocalityThresholdTimeMs); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); Assert.assertEquals(128, @@ -2217,4 +2222,46 @@ public void testConcurrentAccessOnApplications() throws Exception { fs.applications, FSSchedulerApp.class); } + @Test + public void testContinuousScheduling() throws Exception { + // set continuous scheduling enabled + FairScheduler fs = new FairScheduler(); + Configuration conf = createConfiguration(); + conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, + true); + fs.reinitialize(conf, resourceManager.getRMContext()); + Assert.assertTrue("Continuous scheduling should be enabled.", + fs.isContinuousSchedulingEnabled()); + + // Add one node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + fs.handle(nodeEvent1); + + // available resource + Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024); + Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8); + + // send application request + ApplicationAttemptId appAttemptId = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + fs.addApplication(appAttemptId, "queue1", "user1"); + List ask = new ArrayList(); + ResourceRequest request = + createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); + ask.add(request); + fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + + // waiting for continuous_scheduler_sleep_time + Thread.sleep(fs.getConf().getContinuousSchedulingSleepTimeMs() + 500); + + // check consumption + Resource consumption = + fs.applications.get(appAttemptId).getCurrentConsumption(); + Assert.assertEquals(1024, consumption.getMemory()); + Assert.assertEquals(1, consumption.getVirtualCores()); + } + }