diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index c62fa39..bcd811f 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -1262,8 +1262,9 @@ public MyFifoScheduler(RMContext rmContext) { super(); try { Configuration conf = new Configuration(); - reinitialize(conf, rmContext); - } catch (IOException ie) { + setRMContext(rmContext); + serviceInit(conf); + } catch (Exception ie) { LOG.info("add application failed with ", ie); assert (false); } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index f923733..5da69bd 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -87,11 +88,12 @@ import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; -public class ResourceSchedulerWrapper implements - SchedulerWrapper,ResourceScheduler,Configurable { +public class ResourceSchedulerWrapper extends AbstractYarnScheduler + implements SchedulerWrapper,ResourceScheduler,Configurable { private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; + private RMContext rmContext; // counters for scheduler allocate/handle operations private Counter schedulerAllocateCounter; private Counter schedulerHandleCounter; @@ -144,6 +146,7 @@ public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); public ResourceSchedulerWrapper() { + super(ResourceSchedulerWrapper.class.getName()); samplerLock = new ReentrantLock(); queueLock = new ReentrantLock(); } @@ -793,9 +796,28 @@ public Configuration getConf() { } @Override - public void reinitialize(Configuration entries, RMContext rmContext) - throws IOException { - scheduler.reinitialize(entries, rmContext); + public void init(Configuration conf) { + ((AbstractYarnScheduler) scheduler).init(conf); + } + + @Override + public void start() { + ((AbstractYarnScheduler) scheduler).start(); + } + + @Override + public void stop() { + ((AbstractYarnScheduler) scheduler).stop(); + } + + @Override + public void setRMContext(RMContext rmContext) { + scheduler.setRMContext(rmContext); + } + + @Override + public void reinitialize(Configuration conf) throws IOException { + scheduler.reinitialize(conf); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c5b2651..daa9597 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -330,7 +330,7 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + rmContext.getScheduler().reinitialize(getConfig()); RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); return response; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 141a9c6..d6591f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -396,6 +396,8 @@ protected void serviceInit(Configuration configuration) throws Exception { // Initialize the scheduler scheduler = createScheduler(); + scheduler.setRMContext(rmContext); + addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); @@ -424,12 +426,6 @@ protected void serviceInit(Configuration configuration) throws Exception { DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - // creating monitors that handle preemption createPolicyMonitors(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..3124c17 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -33,7 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; -public abstract class AbstractYarnScheduler implements ResourceScheduler { +public abstract class AbstractYarnScheduler extends AbstractService + implements ResourceScheduler { protected RMContext rmContext; protected Map applications; @@ -42,6 +44,15 @@ protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + /** + * Construct the service. + * + * @param name service name + */ + public AbstractYarnScheduler(String name) { + super(name); + } + public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 8840881..55d5dbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -34,10 +34,18 @@ @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { + + /** + * Set RMContext for ResourceScheduler. + * This method should be called before reinitialize + * @param rmContext created by ResourceManager + */ + void setRMContext(RMContext rmContext); + /** * Re-initialize the ResourceScheduler. * @param conf configuration * @throws IOException */ - void reinitialize(Configuration conf, RMContext rmContext) throws IOException; + void reinitialize(Configuration conf) throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6c392b5..990c308 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -189,12 +189,12 @@ public Configuration getConf() { private Resource minimumAllocation; private Resource maximumAllocation; - private boolean initialized = false; - private ResourceCalculator calculator; private boolean usePortForNodeName; - public CapacityScheduler() {} + public CapacityScheduler() { + super(CapacityScheduler.class.getName()); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -254,43 +254,64 @@ public RMContext getRMContext() { public Resource getClusterResources() { return clusterResource; } - + + @VisibleForTesting + public void serviceInitInternal(Configuration conf) throws IOException { + Configuration configuration = new Configuration(conf); + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + this.maximumAllocation = this.conf.getMaximumAllocation(); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = + new ConcurrentHashMap(); + + initializeQueues(this.conf); + + LOG.info("Initialized CapacityScheduler with " + + "calculator=" + getResourceCalculator().getClass() + ", " + + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + + "maximumAllocation=<" + getMaximumResourceCapability() + ">"); + } + @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { + public void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) throws IOException { Configuration configuration = new Configuration(conf); - if (!initialized) { - this.rmContext = rmContext; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap(); - - initializeQueues(this.conf); - - initialized = true; - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">"); - } else { - CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - try { - LOG.info("Re-initializing queues..."); - reinitializeQueues(this.conf); - } catch (Throwable t) { - this.conf = oldConf; - throw new IOException("Failed to re-init queues", t); - } + CapacitySchedulerConfiguration oldConf = this.conf; + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + try { + LOG.info("Re-initializing queues..."); + reinitializeQueues(this.conf); + } catch (Throwable t) { + this.conf = oldConf; + throw new IOException("Failed to re-init queues", t); } } + @Override + public synchronized void + setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 69dcf89..6122a42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -146,7 +146,11 @@ public void run() { @Override public void stop() { running = false; - reloadThread.interrupt(); + // reloadThread can be null if getAllocationFile() in init() + // returns null, so we need to check it. + if (reloadThread != null) { + reloadThread.interrupt(); + } super.stop(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3cdff7f..44d7c0d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -123,7 +123,6 @@ @Unstable @SuppressWarnings("unchecked") public class FairScheduler extends AbstractYarnScheduler { - private boolean initialized; private FairSchedulerConfiguration conf; private Resource minimumAllocation; private Resource maximumAllocation; @@ -192,8 +191,14 @@ private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; + + private Thread updateThread; + private Thread schedulingThread; + private volatile boolean isUpdateThreadRunning = false; + private volatile boolean isSchedulingThreadRunning = false; public FairScheduler() { + super(FairScheduler.class.getName()); clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); @@ -256,7 +261,7 @@ private FSSchedulerApp getCurrentAttemptForContainer( ContainerId containerId) { SchedulerApplication app = applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); + .getApplicationId()); if (app != null) { return (FSSchedulerApp) app.getCurrentAppAttempt(); } @@ -269,7 +274,7 @@ private FSSchedulerApp getCurrentAttemptForContainer( */ private class UpdateThread implements Runnable { public void run() { - while (true) { + while (isSchedulingThreadRunning) { try { Thread.sleep(UPDATE_INTERVAL); update(); @@ -997,7 +1002,7 @@ private synchronized void nodeUpdate(RMNode nm) { } private void continuousScheduling() { - while (true) { + while (isSchedulingThreadRunning) { List nodeIdList = new ArrayList(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This @@ -1228,85 +1233,127 @@ public void recover(RMState state) throws Exception { // NOT IMPLEMENTED } - @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); - - initialized = true; - - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + @VisibleForTesting + void serviceInitInternal(Configuration conf) throws IOException { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + // This stores per-application scheduling information + this.applications = + new ConcurrentHashMap(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); + + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); + isUpdateThreadRunning = true; + updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + isSchedulingThreadRunning = true; + schedulingThread = new Thread( new Runnable() { @Override public void run() { continuousScheduling(); } } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); - } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); - } - allocsLoader.start(); - } else { - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - LOG.error("Failed to reload allocations file", e); - } + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); } + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } + + void serviceStartInternal() throws Exception { + updateThread.start(); + if (continuousSchedulingEnabled) { + schedulingThread.start(); + } + allocsLoader.start(); + updateThread.getState(); + } + + @Override + protected void serviceStart() throws Exception { + serviceStartInternal(); + super.serviceStart(); + } + + void serviceStopInternal() throws Exception { + isUpdateThreadRunning = false; + if (updateThread != null) { + updateThread.interrupt(); + } + if (continuousSchedulingEnabled) { + isSchedulingThreadRunning = false; + schedulingThread.interrupt(); + } + if (allocsLoader != null) { + allocsLoader.stop(); + } + } + + @Override + protected void serviceStop() throws Exception { + serviceStopInternal(); + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } + } + + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 82000e1..b83b725 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -116,7 +116,6 @@ protected Map nodes = new ConcurrentHashMap(); - private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; private boolean usePortForNodeName; @@ -187,6 +186,13 @@ public ActiveUsersManager getActiveUsersManager() { } }; + /** + * Construct the service. + */ + public FifoScheduler() { + super(FifoScheduler.class.getName()); + } + @Override public synchronized void setConf(Configuration conf) { this.conf = conf; @@ -232,33 +238,53 @@ public Resource getMaximumResourceCapability() { return maximumAllocation; } - @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException - { - setConf(conf); - if (!this.initialized) { - validateConf(conf); - this.rmContext = rmContext; - //Use ConcurrentSkipListMap because applications need to be ordered - this.applications = - new ConcurrentSkipListMap(); - this.minimumAllocation = + @VisibleForTesting + public synchronized void serviceInitInternal(Configuration conf) { + reinitialize(conf); + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap(); + this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = + this.maximumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - this.usePortForNodeName = conf.getBoolean( - YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, - conf); - this.activeUsersManager = new ActiveUsersManager(metrics); - this.initialized = true; - } + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, + conf); + this.activeUsersManager = new ActiveUsersManager(metrics); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) { + setConf(conf); + validateConf(conf); + } + + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 98b36f0..55fce68 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -66,7 +68,7 @@ private final int GB = 1024; private static YarnConfiguration conf; - + @BeforeClass public static void setup() { conf = new YarnConfiguration(); @@ -81,7 +83,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.reinitialize(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -130,6 +132,7 @@ public void test() throws Exception { rootLogger.setLevel(Level.DEBUG); MockRM rm = new MockRM(conf); rm.start(); + while(!rm.isInState(Service.STATE.STARTED)); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); @@ -266,7 +269,7 @@ public void testReconnectedNode() throws Exception { conf.setQueues("default", new String[] {"default"}); conf.setCapacity("default", 100); FifoScheduler fs = new FifoScheduler(); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); @@ -473,7 +476,6 @@ public void testHeadroom() throws Exception { rm.stop(); } - public static void main(String[] args) throws Exception { TestFifoScheduler t = new TestFifoScheduler(); t.test(); @@ -481,4 +483,5 @@ public static void main(String[] args) throws Exception { t.testNonDefaultMinimumAllocation(); t.testReconnectedNode(); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 47ec546..a542171 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,20 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -56,16 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -105,8 +95,7 @@ private static float B3_CAPACITY = 20; private ResourceManager resourceManager = null; - private RMContext mockContext; - + @Before public void setUp() throws Exception { resourceManager = new ResourceManager(); @@ -120,9 +109,6 @@ public void setUp() throws Exception { resourceManager.getRMContainerTokenSecretManager().rollMasterKey(); resourceManager.getRMNMTokenSecretManager().rollMasterKey(); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - mockContext = mock(RMContext.class); - when(mockContext.getConfigurationProvider()).thenReturn( - new LocalConfigurationProvider()); } @After @@ -137,8 +123,11 @@ public void testConfValidation() throws Exception { Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); + scheduler.setRMContext(resourceManager.getRMContext()); try { - scheduler.reinitialize(conf, mockContext); + // For setting conf file. + ((CapacityScheduler)scheduler).serviceInit(conf); + scheduler.reinitialize(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -152,7 +141,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, mockContext); + scheduler.reinitialize(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -350,15 +339,16 @@ public void testRefreshQueues() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.serviceInitInternal(conf); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); - cs.reinitialize(conf, mockContext); + cs.reinitialize(conf); checkQueueCapacities(cs, 80f, 20f); } @@ -449,10 +439,11 @@ public void testParseQueue() throws IOException { conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.reinitialize(conf); } @Test @@ -462,10 +453,11 @@ public void testReconnectedNode() throws Exception { setupQueueConfiguration(csConf); CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, - null, null, new RMContainerTokenSecretManager(csConf), - new NMTokenSecretManagerInRM(csConf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(new RMContextImpl(null, null, null, null, + null, null, new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.serviceInitInternal(csConf); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); @@ -489,10 +481,11 @@ public void testRefreshQueuesWithNewQueue() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + cs.serviceInitInternal(conf); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); // Add a new queue b4 @@ -508,7 +501,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception { conf.setCapacity(B2, B2_CAPACITY); conf.setCapacity(B3, B3_CAPACITY); conf.setCapacity(B4, B4_CAPACITY); - cs.reinitialize(conf,mockContext); + cs.reinitialize(conf); checkQueueCapacities(cs, 80f, 20f); // Verify parent for B4 @@ -635,15 +628,15 @@ public void testGetAppsInQueue() throws Exception { @Test public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { - AsyncDispatcher rmDispatcher = new AsyncDispatcher(); CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); - cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, - null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(new RMContextImpl(rmDispatcher, null, null, null, + null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(),null)); + cs.serviceInitInternal(conf); SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index e9568ea..97757bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -142,7 +142,8 @@ public void setUp() throws Exception { queues, queues, TestUtils.spyHook); - cs.reinitialize(csConf, rmContext); + cs.setRMContext(rmContext); + cs.serviceInit(csConf); } private static final String A = "a"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 21c446a..abbebc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -44,11 +44,12 @@ public void testQueueParsing() throws Exception { CapacityScheduler capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(conf); - capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, - null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); - + capacityScheduler.setRMContext(new RMContextImpl(null, null, + null, null, null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + capacityScheduler.serviceInitInternal(conf); + CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA); @@ -142,7 +143,7 @@ public void testRootQueueParsing() throws Exception { CapacityScheduler capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); + capacityScheduler.serviceInitInternal(conf); } public void testMaxCapacity() throws Exception { @@ -164,7 +165,7 @@ public void testMaxCapacity() throws Exception { try { capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); + capacityScheduler.serviceInitInternal(conf); } catch (IllegalArgumentException iae) { fail = true; } @@ -176,17 +177,17 @@ public void testMaxCapacity() throws Exception { // Now this should work capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); - + capacityScheduler.serviceInitInternal(conf); + fail = false; try { - LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A); - a.setMaxCapacity(45); + LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A); + a.setMaxCapacity(45); } catch (IllegalArgumentException iae) { fail = true; } Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " + - "setMaxCap", fail); + "setMaxCap", fail); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index e4dc801..1d0d95f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -40,7 +40,7 @@ private Resource maxResource = Resources.createResource(10); @Before - public void setup() throws IOException { + public void setup() throws Exception { FairScheduler scheduler = new FairScheduler(); Configuration conf = createConfiguration(); // All tests assume only one assignment per node update @@ -48,7 +48,8 @@ public void setup() throws IOException { ResourceManager resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.setRMContext(resourceManager.getRMContext()); String queueName = "root.queue1"; scheduler.allocConf = mock(AllocationConfiguration.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d1052bb..c638f71 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.service.Service; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -131,7 +132,7 @@ public void tick(int seconds) { // HELPER METHODS @Before - public void setUp() throws IOException { + public void setUp() throws Exception { scheduler = new FairScheduler(); conf = createConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); @@ -150,10 +151,20 @@ public void setUp() throws IOException { // to initialize the master key resourceManager.getRMContainerTokenSecretManager().rollMasterKey(); + // Main threads of FairScheduler are launched in start(). + resourceManager.start(); } @After public void tearDown() { + if (resourceManager != null) { + resourceManager.stop(); + } + + if (scheduler != null) { + scheduler.stop(); + } + scheduler = null; resourceManager = null; QueueMetrics.clearQueueMetrics(); @@ -164,11 +175,12 @@ public void tearDown() { @Test (timeout = 30000) public void testConfValidation() throws Exception { ResourceScheduler scheduler = new FairScheduler(); + FairScheduler fs = (FairScheduler) scheduler; Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + fs.serviceInitInternal(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -182,7 +194,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, null); + fs.serviceInitInternal(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -304,9 +316,9 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); + true); conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, - 10); + 10); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 5000); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, @@ -315,7 +327,8 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); Assert.assertEquals(true, scheduler.sizeBasedWeight); @@ -341,8 +354,8 @@ public void testNonMinZeroResourcesSettings() throws IOException { conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.serviceInitInternal(conf); Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); @@ -359,7 +372,7 @@ public void testMinZeroResourcesSettings() throws IOException { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); @@ -368,7 +381,8 @@ public void testMinZeroResourcesSettings() throws IOException { @Test public void testAggregateCapacityTracking() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -393,7 +407,8 @@ public void testAggregateCapacityTracking() throws Exception { @Test public void testSimpleFairShareCalculation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -420,7 +435,8 @@ public void testSimpleFairShareCalculation() throws IOException { @Test public void testSimpleHierarchicalFairShareCalculation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) int capacity = 10 * 24; @@ -453,7 +469,8 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException { @Test public void testHierarchicalQueuesSimilarParents() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true); @@ -477,7 +494,8 @@ public void testHierarchicalQueuesSimilarParents() throws IOException { @Test public void testSchedulerRootQueueMetrics() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -516,7 +534,8 @@ public void testSchedulerRootQueueMetrics() throws Exception { @Test (timeout = 5000) public void testSimpleContainerAllocation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -564,7 +583,8 @@ public void testSimpleContainerAllocation() throws IOException { @Test (timeout = 5000) public void testSimpleContainerReservation() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -618,7 +638,8 @@ public void testSimpleContainerReservation() throws Exception { @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); @@ -644,7 +665,8 @@ public void testUserAsDefaultQueue() throws Exception { @Test public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); @@ -670,7 +692,8 @@ public void testNotUserAsDefaultQueue() throws Exception { @Test public void testEmptyQueueName() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // only default queue assertEquals(1, scheduler.getQueueManager().getLeafQueues().size()); @@ -690,8 +713,9 @@ public void testEmptyQueueName() throws Exception { @Test public void testAssignToQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); @@ -708,7 +732,8 @@ public void testAssignToQueue() throws Exception { @Test public void testAssignToNonLeafQueueReturnsNull() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true); scheduler.getQueueManager().getLeafQueue("root.child2", true); @@ -725,7 +750,8 @@ public void testAssignToNonLeafQueueReturnsNull() throws Exception { public void testQueuePlacementWithPolicy() throws Exception { conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, SimpleGroupsMapping.class, GroupMappingServiceProvider.class); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId appId; @@ -783,7 +809,8 @@ public void testFairShareWithMinAlloc() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -815,7 +842,8 @@ else if (p.getName().equals("root.queueB")) { */ @Test public void testQueueDemandCalculation() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); @@ -865,7 +893,8 @@ public void testQueueDemandCalculation() throws Exception { @Test public void testAppAdditionAndRemoval() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId attemptId =createAppAttemptId(1, 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", "user1"); @@ -915,7 +944,8 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); Collection leafQueues = queueManager.getLeafQueues(); @@ -948,7 +978,8 @@ public void testConfigureRootQueue() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSQueue root = queueManager.getRootQueue(); @@ -974,7 +1005,8 @@ public void testIsStarvedForMinShare() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -1031,8 +1063,9 @@ public void testIsStarvedForFairShare() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, @@ -1104,7 +1137,8 @@ public void testChoiceOfPreemptedContainers() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Create four nodes RMNode node1 = @@ -1271,7 +1305,8 @@ public void testPreemptionDecision() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Create four nodes RMNode node1 = @@ -1368,7 +1403,8 @@ public void testPreemptionDecision() throws Exception { @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -1410,8 +1446,9 @@ public void testUserMaxRunningApps() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // Add a node RMNode node1 = MockNodes @@ -1451,7 +1488,8 @@ public void testUserMaxRunningApps() throws Exception { @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -1531,8 +1569,9 @@ public void testAclSubmitApplication() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname", 1); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", @@ -1546,7 +1585,8 @@ public void testAclSubmitApplication() throws Exception { @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1595,7 +1635,8 @@ public void testMultipleNodesSingleRackRequest() throws Exception { @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1639,7 +1680,8 @@ public void testFifoWithinQueue() throws Exception { @Test(timeout = 3000) public void testMaxAssign() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, @@ -1682,7 +1724,8 @@ public void testMaxAssign() throws Exception { */ @Test(timeout = 5000) public void testAssignContainer() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); final String user = "user1"; final String fifoQueue = "fifo"; @@ -1766,8 +1809,10 @@ public void testNotAllowSubmitApplication() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + //scheduler.start(); + int appId = this.APP_ID++; String user = "usernotallow"; String queue = "queue1"; @@ -1797,8 +1842,10 @@ public void testNotAllowSubmitApplication() throws Exception { } catch (InterruptedException ex) {ex.printStackTrace();} numTries++; } - assertEquals("The application doesn't reach SUBMITTED.", - RMAppState.SUBMITTED, application.getState()); + + if (numTries > MAX_TRIES) { + fail("The application doesn't reach SUBMITTED."); + } ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); @@ -1816,7 +1863,8 @@ public void testNotAllowSubmitApplication() throws Exception { @Test public void testReservationThatDoesntFit() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1844,7 +1892,8 @@ public void testReservationThatDoesntFit() throws IOException { @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); @@ -1872,7 +1921,8 @@ public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { @Test public void testStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -1912,7 +1962,8 @@ public void testStrictLocality() throws IOException { @Test public void testCancelStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -1962,7 +2013,8 @@ public void testCancelStrictLocality() throws IOException { */ @Test public void testReservationsStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); @@ -2002,7 +2054,8 @@ public void testReservationsStrictLocality() throws IOException { @Test public void testNoMoreCpuOnNode() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), 1, "127.0.0.1"); @@ -2023,7 +2076,8 @@ public void testNoMoreCpuOnNode() throws IOException { @Test public void testBasicDRFAssignment() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); @@ -2063,7 +2117,8 @@ public void testBasicDRFAssignment() throws Exception { */ @Test public void testBasicDRFWithQueues() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), 1, "127.0.0.1"); @@ -2099,7 +2154,8 @@ public void testBasicDRFWithQueues() throws Exception { @Test public void testDRFHierarchicalQueues() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), 1, "127.0.0.1"); @@ -2167,8 +2223,8 @@ public void testDRFHierarchicalQueues() throws Exception { public void testHostPortNodeName() throws Exception { conf.setBoolean(YarnConfiguration .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - scheduler.reinitialize(conf, - resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1", 1); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -2248,8 +2304,9 @@ public void testUserAndQueueMaxRunningApps() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); verifyAppRunnable(attId1, true); @@ -2302,8 +2359,9 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); verifyAppRunnable(attId1, true); @@ -2362,8 +2420,10 @@ public void testContinuousScheduling() throws Exception { FairScheduler fs = new FairScheduler(); Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); - fs.reinitialize(conf, resourceManager.getRMContext()); + true); + fs.setRMContext(resourceManager.getRMContext()); + fs.serviceInit(conf); + fs.serviceStart(); Assert.assertTrue("Continuous scheduling should be enabled.", fs.isContinuousSchedulingEnabled()); @@ -2443,7 +2503,8 @@ public void testDontAllowUndeclaredPools() throws Exception{ out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false); @@ -2473,7 +2534,8 @@ public void testDontAllowUndeclaredPools() throws Exception{ @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); final int GB = 1024; String host = "127.0.0.1"; @@ -2525,7 +2587,8 @@ public void testBlacklistNodes() throws Exception { @Test public void testGetAppsInQueue() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId appAttId1 = createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1"); @@ -2563,14 +2626,18 @@ public void testGetAppsInQueue() throws Exception { public void testAddAndRemoveAppFromFairScheduler() throws Exception { FairScheduler scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.serviceStart(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( scheduler.getSchedulerApplications(), scheduler, "default"); } @Test public void testMoveRunnableApp() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2608,8 +2675,9 @@ public void testMoveRunnableApp() throws Exception { @Test public void testMoveNonRunnableApp() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2628,8 +2696,9 @@ public void testMoveNonRunnableApp() throws Exception { @Test public void testMoveMakesAppRunnable() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2655,8 +2724,9 @@ public void testMoveMakesAppRunnable() throws Exception { @Test (expected = YarnException.class) public void testMoveWouldViolateMaxAppsConstraints() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + QueueManager queueMgr = scheduler.getQueueManager(); queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0); @@ -2669,8 +2739,9 @@ public void testMoveWouldViolateMaxAppsConstraints() throws Exception { @Test (expected = YarnException.class) public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); queueMgr.getLeafQueue("queue2", true); @@ -2692,7 +2763,8 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { @Test (expected = YarnException.class) public void testMoveToNonexistentQueue() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); scheduler.getQueueManager().getLeafQueue("queue1", true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java index 633bca4..24db09c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java @@ -38,7 +38,7 @@ private ResourceManager resourceManager; @Before - public void setUp() throws IOException { + public void setUp() throws Exception { scheduler = new FairScheduler(); Configuration conf = new YarnConfiguration(); @@ -51,7 +51,8 @@ public void setUp() throws IOException { resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.setRMContext(resourceManager.getRMContext()); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 38a7995..c58916f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -60,13 +60,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -94,7 +91,7 @@ public void setUp() throws Exception { resourceManager = new ResourceManager(); Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); resourceManager.init(conf); } @@ -148,7 +145,9 @@ public void testAppAttemptMetrics() throws Exception { null, null, null, null, null, null, null, writer); FifoScheduler schedular = new FifoScheduler(); - schedular.reinitialize(new Configuration(), rmContext); + Configuration conf = new Configuration(); + schedular.setRMContext(rmContext); + schedular.serviceInit(conf); QueueMetrics metrics = schedular.getRootQueueMetrics(); int beforeAppsSubmitted = metrics.getAppsSubmitted(); @@ -186,7 +185,8 @@ public void testNodeLocalAssignment() throws Exception { null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(); - scheduler.reinitialize(new Configuration(), rmContext); + scheduler.serviceInit(new Configuration()); + scheduler.setRMContext(rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), 1, "127.0.0.1"); @@ -256,7 +256,8 @@ public void testUpdateResourceOnNode() throws Exception { return nodes; } }; - scheduler.reinitialize(new Configuration(), rmContext); + scheduler.serviceInit(new Configuration()); + scheduler.setRMContext(rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); @@ -543,6 +544,8 @@ public void testBlackListNodes() throws Exception { MockRM rm = new MockRM(conf); rm.start(); FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + fs.setRMContext(rm.getRMContext()); + fs.serviceInitInternal(conf); String host = "127.0.0.1"; RMNode node = @@ -574,14 +577,15 @@ public void testBlackListNodes() throws Exception { @Test public void testGetAppsInQueue() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); + Application application_0 = new Application("user_0", resourceManager); application_0.submit(); Application application_1 = new Application("user_0", resourceManager); application_1.submit(); - - ResourceScheduler scheduler = resourceManager.getResourceScheduler(); - + List appsInDefault = scheduler.getAppsInQueue("default"); assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); @@ -597,6 +601,8 @@ public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { ResourceScheduler.class); MockRM rm = new MockRM(conf); FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); + fs.setRMContext(rm.getRMContext()); + fs.serviceInitInternal(conf); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( fs.getSchedulerApplications(), fs, "queue"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 2c2aae6..6b2fb0d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -203,10 +203,11 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, + cs.setRMContext(new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null)); + cs.serviceInitInternal(conf); return cs; } @@ -281,7 +282,7 @@ public static FifoScheduler mockFifoScheduler() throws Exception { FifoScheduler fs = new FifoScheduler(); fs.setConf(new YarnConfiguration()); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); return fs; }