diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index f923733..3280484 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -87,11 +88,12 @@ import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; -public class ResourceSchedulerWrapper implements - SchedulerWrapper,ResourceScheduler,Configurable { +public class ResourceSchedulerWrapper extends AbstractYarnScheduler + implements SchedulerWrapper, ResourceScheduler, Configurable { private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; + private RMContext rmContext; // counters for scheduler allocate/handle operations private Counter schedulerAllocateCounter; private Counter schedulerHandleCounter; @@ -144,6 +146,7 @@ public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); public ResourceSchedulerWrapper() { + super(ResourceSchedulerWrapper.class.getName()); samplerLock = new ReentrantLock(); queueLock = new ReentrantLock(); } @@ -793,9 +796,29 @@ public Configuration getConf() { } @Override - public void reinitialize(Configuration entries, RMContext rmContext) - throws IOException { - scheduler.reinitialize(entries, rmContext); + public void init(Configuration conf) { + ((AbstractYarnScheduler) scheduler).init(conf); + } + + @Override + public void start() { + ((AbstractYarnScheduler) scheduler).start(); + } + + @Override + public void stop() { + ((AbstractYarnScheduler) scheduler).stop(); + } + + @Override + public void setRMContext(RMContext rmContext) { + scheduler.setRMContext(rmContext); + } + + @Override + public void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + scheduler.reinitialize(conf, rmContext); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b914b1f..b62bd5f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -401,6 +401,8 @@ protected void serviceInit(Configuration configuration) throws Exception { // Initialize the scheduler scheduler = createScheduler(); + scheduler.setRMContext(rmContext); + addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); @@ -429,12 +431,6 @@ protected void serviceInit(Configuration configuration) throws Exception { DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - // creating monitors that handle preemption createPolicyMonitors(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..3124c17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -33,7 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; -public abstract class AbstractYarnScheduler implements ResourceScheduler { +public abstract class AbstractYarnScheduler extends AbstractService + implements ResourceScheduler { protected RMContext rmContext; protected Map applications; @@ -42,6 +44,15 @@ protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + /** + * Construct the service. + * + * @param name service name + */ + public AbstractYarnScheduler(String name) { + super(name); + } + public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 8840881..5649ccf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -34,6 +34,15 @@ @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { + + /** + * Set RMContext for ResourceScheduler. + * This method should be called immediately after instantiating + * a scheduler once. + * @param rmContext created by ResourceManager + */ + void setRMContext(RMContext rmContext); + /** * Re-initialize the ResourceScheduler. * @param conf configuration diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..1fbd9ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -107,6 +108,8 @@ private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private CSQueue root; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; static final Comparator queueComparator = new Comparator() { @Override @@ -209,7 +212,9 @@ public Configuration getConf() { + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - public CapacityScheduler() {} + public CapacityScheduler() { + super(CapacityScheduler.class.getName()); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -269,41 +274,79 @@ public RMContext getRMContext() { public Resource getClusterResources() { return clusterResource; } - + + @Override + public void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + private void initScheduler(Configuration conf) throws IOException { + Configuration configuration = new Configuration(conf); + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + this.maximumAllocation = this.conf.getMaximumAllocation(); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = + new ConcurrentHashMap(); + + initializeQueues(this.conf); + + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = + this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this); + } + + initialized = true; + LOG.info("Initialized CapacityScheduler with " + + "calculator=" + getResourceCalculator().getClass() + ", " + + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + + "asynchronousScheduling=" + scheduleAsynchronously + ", " + + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + } + + private void startSchedulerThreads() { + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThread, + "asyncSchedulerThread is null"); + asyncSchedulerThread.start(); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + if (scheduleAsynchronously && asyncSchedulerThread != null) { + asyncSchedulerThread.interrupt(); + asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); + } + super.serviceStop(); + } + @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { Configuration configuration = new Configuration(conf); if (!initialized) { this.rmContext = rmContext; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap(); - - initializeQueues(this.conf); - - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = - this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - if (scheduleAsynchronously) { - asyncSchedulerThread = new AsyncScheduleThread(this); - asyncSchedulerThread.start(); - } - - initialized = true; - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + - "asynchronousScheduling=" + scheduleAsynchronously + ", " + - "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); - + initScheduler(conf); + startSchedulerThreads(); } else { CapacitySchedulerConfiguration oldConf = this.conf; this.conf = loadCapacitySchedulerConfiguration(configuration); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index bedbb64..6c35630 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -68,7 +68,9 @@ * (this is done to prevent loading a file that hasn't been fully written). */ public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000; - + + public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues @@ -146,7 +148,14 @@ public void run() { @Override public void stop() { running = false; - reloadThread.interrupt(); + if (reloadThread != null) { + reloadThread.interrupt(); + try { + reloadThread.join(THREAD_JOIN_TIMEOUT_MS); + } catch (InterruptedException e) { + LOG.warn("reloadThread fails to join."); + } + } super.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..0c25fbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -144,6 +145,11 @@ // How often fair shares are re-calculated (ms) protected long UPDATE_INTERVAL = 500; + private Thread updateThread; + private Thread schedulingThread; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + // Aggregate metrics FSQueueMetrics rootMetrics; @@ -194,6 +200,7 @@ AllocationConfiguration allocConf; public FairScheduler() { + super(FairScheduler.class.getName()); clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); @@ -519,7 +526,8 @@ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { return resToPreempt; } - public RMContainerTokenSecretManager getContainerTokenSecretManager() { + public synchronized RMContainerTokenSecretManager + getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } @@ -1231,77 +1239,128 @@ public void recover(RMState state) throws Exception { } @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); - - initialized = true; - - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + private void initScheduler(Configuration conf) throws IOException { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + // This stores per-application scheduling information + this.applications = + new ConcurrentHashMap(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); + + initialized = true; + + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); + updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + schedulingThread = new Thread( new Runnable() { @Override public void run() { continuousScheduling(); } } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + } + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + } + + private void startSchedulerThreads() { + Preconditions.checkNotNull(updateThread, "updateThread is null"); + Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); + updateThread.start(); + if (continuousSchedulingEnabled) { + Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); + schedulingThread.start(); + } + allocsLoader.start(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + synchronized (this) { + if (updateThread != null) { + updateThread.interrupt(); + updateThread.join(THREAD_JOIN_TIMEOUT_MS); } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); + if (continuousSchedulingEnabled) { + if (schedulingThread != null) { + schedulingThread.interrupt(); + schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); + } + } + if (allocsLoader != null) { + allocsLoader.stop(); } - allocsLoader.start(); + } + + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + if (!initialized) { + this.rmContext = rmContext; + initScheduler(conf); + startSchedulerThreads(); } else { try { allocsLoader.reloadAllocations(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 21fcdec..25428a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -93,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -187,6 +188,48 @@ public ActiveUsersManager getActiveUsersManager() { } }; + public FifoScheduler() { + super(FifoScheduler.class.getName()); + } + + private void initScheduler(Configuration conf) { + validateConf(conf); + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap(); + this.minimumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + this.maximumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, + conf); + this.activeUsersManager = new ActiveUsersManager(metrics); + this.initialized = true; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + @Override public synchronized void setConf(Configuration conf) { this.conf = conf; @@ -233,35 +276,21 @@ public Resource getMaximumResourceCapability() { } @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { setConf(conf); if (!this.initialized) { - validateConf(conf); + initScheduler(conf); this.rmContext = rmContext; - //Use ConcurrentSkipListMap because applications need to be ordered - this.applications = - new ConcurrentSkipListMap(); - this.minimumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - this.usePortForNodeName = conf.getBoolean( - YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, - conf); - this.activeUsersManager = new ActiveUsersManager(metrics); - this.initialized = true; } } - @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index bdf89bb..e30e657 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -114,7 +114,7 @@ public void setUp() throws Exception { setupQueueConfiguration(csConf, newRoot); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); - + csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -142,7 +142,9 @@ public void setUp() throws Exception { queues, queues, TestUtils.spyHook); - cs.reinitialize(csConf, rmContext); + cs.setRMContext(rmContext); + cs.init(csConf); + cs.start(); } private static final String A = "a"; @@ -2080,5 +2082,8 @@ private CapacitySchedulerContext mockCSContext( @After public void tearDown() throws Exception { + if (cs != null) { + cs.stop(); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f6dfc3f..d074f2a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -148,6 +148,7 @@ public void testAppAttemptMetrics() throws Exception { null, null, null, null, null, null, null, writer); FifoScheduler schedular = new FifoScheduler(); + schedular.setRMContext(rmContext); schedular.reinitialize(new Configuration(), rmContext); QueueMetrics metrics = schedular.getRootQueueMetrics(); int beforeAppsSubmitted = metrics.getAppsSubmitted(); @@ -186,6 +187,7 @@ public void testNodeLocalAssignment() throws Exception { null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(); + scheduler.setRMContext(rmContext); scheduler.reinitialize(new Configuration(), rmContext); RMNode node0 = MockNodes.newNodeInfo(1, @@ -256,6 +258,7 @@ public void testUpdateResourceOnNode() throws Exception { return nodes; } }; + scheduler.setRMContext(rmContext); scheduler.reinitialize(new Configuration(), rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 4), 1, "127.0.0.1"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 2c2aae6..67e9dda 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -203,10 +203,11 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, + cs.setRMContext(new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null)); + cs.init(conf); return cs; } @@ -281,7 +282,7 @@ public static FifoScheduler mockFifoScheduler() throws Exception { FifoScheduler fs = new FifoScheduler(); fs.setConf(new YarnConfiguration()); - fs.reinitialize(conf, null); + fs.init(conf); return fs; }