diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSBackgroundTasksRunner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSBackgroundTasksRunner.java new file mode 100644 index 0000000..c563522 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSBackgroundTasksRunner.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Clock; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Helper daemon thread to run all FairScheduler background tasks in a single + * thread. + * + * One can add a Runnable to be run along with a duration to wait between + * successive runs. + * + * The thread exits when interrupted. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FSBackgroundTasksRunner extends Thread { + private static class RunnableInfo { + final Runnable r; + final long period; + long nextRun; + + RunnableInfo(Runnable r, long period) { + this.r = r; + this.period = period; + this.nextRun = System.currentTimeMillis(); + } + } + + private static final Log LOG = LogFactory.getLog(FSBackgroundTasksRunner.class); + + private final Clock clock; + private AtomicLong sleepTime; + private final ConcurrentLinkedQueue runnables = + new ConcurrentLinkedQueue(); + + public FSBackgroundTasksRunner(Clock clock) { + this.clock = clock; + this.setName(FSBackgroundTasksRunner.class.getName()); + this.setDaemon(true); + } + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + for (RunnableInfo rInfo : runnables) { + long now = clock.getTime(); + if (rInfo.nextRun < now) { + rInfo.r.run(); + now = clock.getTime(); + //TODO: Capture time taken by r.run() as a metric. + rInfo.nextRun = now + rInfo.period; + } + } + + try { + Thread.sleep(sleepTime.get()); + } catch (InterruptedException e) { + LOG.warn(this.getName() + " interrupted. Exiting!"); + return; + } + } + } + + public void addRunnableToRun(Runnable r, long period) { + if (this.sleepTime == null) { + this.sleepTime = new AtomicLong(period); + } else { + this.sleepTime.set(gcd(period, sleepTime.get())); + } + + this.runnables.add(new RunnableInfo(r, period)); + } + + private static long gcd(long a, long b) + { + while (b > 0) + { + long temp = b; + b = a % b; + a = temp; + } + return a; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 27a0075..039fbe2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -122,7 +122,7 @@ private Resource incrAllocation; private QueueManager queueMgr; - private Clock clock; + private Clock clock = new SystemClock(); private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -139,8 +139,10 @@ private final int UPDATE_DEBUG_FREQUENCY = 5; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - private Thread updateThread; - private Thread schedulingThread; + + // Thread to run all background tasks for the scheduler + @VisibleForTesting + FSBackgroundTasksRunner backgroundTasksRunner; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -190,7 +192,6 @@ public FairScheduler() { super(FairScheduler.class.getName()); - clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -243,24 +244,34 @@ public QueueManager getQueueManager() { } /** - * A runnable which calls {@link FairScheduler#update()} every - * updateInterval milliseconds. + * A runnable which calls {@link FairScheduler#update()} and preempts tasks + * if necessary. */ - private class UpdateThread implements Runnable { + private class UpdateRunnable implements Runnable { + + @Override public void run() { - while (true) { - try { - Thread.sleep(updateInterval); - update(); - preemptTasksIfNecessary(); - } catch (Exception e) { - LOG.error("Exception in fair scheduler UpdateThread", e); - } + try { + update(); + preemptTasksIfNecessary(); + } catch (Exception e) { + LOG.error("Exception in fair scheduler UpdateThread", e); } } } /** + * A runnable that attempts scheduling. + */ + private class ContinuousSchedulingRunnable implements Runnable { + + @Override + public void run() { + continuousSchedulingAttempt(); + } + } + + /** * Recompute the internal variables used by the scheduler - per-job weights, * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. @@ -548,12 +559,16 @@ public synchronized int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public synchronized Clock getClock() { - return clock; + public Clock getClock() { + synchronized (clock) { + return clock; + } } - protected synchronized void setClock(Clock clock) { - this.clock = clock; + protected void setClock(Clock clock) { + synchronized (clock) { + this.clock = clock; + } } public FairSchedulerEventLog getEventLog() { @@ -1229,33 +1244,14 @@ private synchronized void initScheduler(Configuration conf) throw new IOException("Failed to start FairScheduler", e); } - updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - + backgroundTasksRunner = new FSBackgroundTasksRunner(clock); + backgroundTasksRunner.addRunnableToRun(new UpdateRunnable(), updateInterval); if (continuousSchedulingEnabled) { - // start continuous scheduling thread - schedulingThread = new Thread( - new Runnable() { - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { - continuousSchedulingAttempt(); - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.error("Continuous scheduling thread interrupted. Exiting. ", - e); - return; - } - } - } - } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); + backgroundTasksRunner.addRunnableToRun( + new ContinuousSchedulingRunnable(), getContinuousSchedulingSleepMs()); } + //TODO: Move allocsLoader also to background threads allocsLoader.init(conf); allocsLoader.setReloadListener(new AllocationReloadListener()); // If we fail to load allocations file on initialize, we want to fail @@ -1269,13 +1265,9 @@ public void run() { } private synchronized void startSchedulerThreads() { - Preconditions.checkNotNull(updateThread, "updateThread is null"); + Preconditions.checkNotNull(backgroundTasksRunner, "updateThread is null"); Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); - updateThread.start(); - if (continuousSchedulingEnabled) { - Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); - schedulingThread.start(); - } + backgroundTasksRunner.start(); allocsLoader.start(); } @@ -1294,15 +1286,9 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { synchronized (this) { - if (updateThread != null) { - updateThread.interrupt(); - updateThread.join(THREAD_JOIN_TIMEOUT_MS); - } - if (continuousSchedulingEnabled) { - if (schedulingThread != null) { - schedulingThread.interrupt(); - schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); - } + if (backgroundTasksRunner != null) { + backgroundTasksRunner.interrupt(); + backgroundTasksRunner.join(THREAD_JOIN_TIMEOUT_MS); } if (allocsLoader != null) { allocsLoader.stop(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index df157e7..7f77b01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -132,8 +132,16 @@ public void setUp() throws IOException { @After public void tearDown() { - scheduler = null; - resourceManager = null; + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + QueueMetrics.clearQueueMetrics(); DefaultMetricsSystem.shutdown(); }