diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 3eb5222..a994ff0 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -1262,8 +1262,9 @@ public MyFifoScheduler(RMContext rmContext) { super(); try { Configuration conf = new Configuration(); - reinitialize(conf, rmContext); - } catch (IOException ie) { + setRMContext(rmContext); + serviceInit(conf); + } catch (Exception ie) { LOG.info("add application failed with ", ie); assert (false); } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 9a3cb24..0f811b1 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -86,8 +87,8 @@ import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; -public class ResourceSchedulerWrapper implements - SchedulerWrapper,ResourceScheduler,Configurable { +public class ResourceSchedulerWrapper extends AbstractYarnScheduler + implements SchedulerWrapper,ResourceScheduler,Configurable { private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; @@ -143,6 +144,7 @@ public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); public ResourceSchedulerWrapper() { + super(ResourceSchedulerWrapper.class.getName()); samplerLock = new ReentrantLock(); queueLock = new ReentrantLock(); } @@ -792,9 +794,28 @@ public Configuration getConf() { } @Override - public void reinitialize(Configuration entries, RMContext rmContext) - throws IOException { - scheduler.reinitialize(entries, rmContext); + public void init(Configuration conf) { + ((AbstractYarnScheduler) scheduler).init(conf); + } + + @Override + public void start() { + ((AbstractYarnScheduler) scheduler).start(); + } + + @Override + public void stop() { + ((AbstractYarnScheduler) scheduler).stop(); + } + + @Override + public void setRMContext(RMContext rmContext) { + scheduler.setRMContext(rmContext); + } + + @Override + public void reinitialize(Configuration conf) throws IOException { + scheduler.reinitialize(conf); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 971603a..eb9f589 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -305,8 +305,8 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", + rmContext.getScheduler().reinitialize(getConfig()); + RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", "AdminService"); return recordFactory.newRecordInstance(RefreshQueuesResponse.class); } catch (IOException ioe) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 83d7f9d..fc2b3d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -353,6 +353,8 @@ protected void serviceInit(Configuration configuration) throws Exception { // Initialize the scheduler scheduler = createScheduler(); + scheduler.setRMContext(rmContext); + addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); @@ -381,12 +383,6 @@ protected void serviceInit(Configuration configuration) throws Exception { DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - // creating monitors that handle preemption createPolicyMonitors(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index e460f1c..ba7dcf4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -31,11 +32,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -public class AbstractYarnScheduler { +public class AbstractYarnScheduler extends AbstractService { protected RMContext rmContext; protected Map applications; + /** + * Construct the service. + * + * @param name service name + */ + public AbstractYarnScheduler(String name) { + super(name); + } + public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 8840881..55d5dbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -34,10 +34,18 @@ @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { + + /** + * Set RMContext for ResourceScheduler. + * This method should be called before reinitialize + * @param rmContext created by ResourceManager + */ + void setRMContext(RMContext rmContext); + /** * Re-initialize the ResourceScheduler. * @param conf configuration * @throws IOException */ - void reinitialize(Configuration conf, RMContext rmContext) throws IOException; + void reinitialize(Configuration conf) throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0197c5b..fb2fddc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -191,12 +191,12 @@ public Configuration getConf() { private Resource minimumAllocation; private Resource maximumAllocation; - private boolean initialized = false; - private ResourceCalculator calculator; private boolean usePortForNodeName; - public CapacityScheduler() {} + public CapacityScheduler() { + super(CapacityScheduler.class.getName()); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -256,43 +256,62 @@ public RMContext getRMContext() { public Resource getClusterResources() { return clusterResource; } - + + @VisibleForTesting + public void serviceInitInternal(Configuration conf) throws IOException { + this.conf = new CapacitySchedulerConfiguration(conf); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + this.maximumAllocation = this.conf.getMaximumAllocation(); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = + new ConcurrentHashMap(); + + initializeQueues(this.conf); + + LOG.info("Initialized CapacityScheduler with " + + "calculator=" + getResourceCalculator().getClass() + ", " + + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + + "maximumAllocation=<" + getMaximumResourceCapability() + ">"); + } + @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { - if (!initialized) { - this.conf = new CapacitySchedulerConfiguration(conf); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap(); - this.rmContext = rmContext; - - initializeQueues(this.conf); - - initialized = true; - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">"); - } else { + public void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } - CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = new CapacitySchedulerConfiguration(conf); - validateConf(this.conf); - try { - LOG.info("Re-initializing queues..."); - reinitializeQueues(this.conf); - } catch (Throwable t) { - this.conf = oldConf; - throw new IOException("Failed to re-init queues", t); - } + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) throws IOException { + CapacitySchedulerConfiguration oldConf = this.conf; + this.conf = new CapacitySchedulerConfiguration(conf); + validateConf(this.conf); + try { + LOG.info("Re-initializing queues..."); + reinitializeQueues(this.conf); + } catch (Throwable t) { + this.conf = oldConf; + throw new IOException("Failed to re-init queues", t); } } + @Override + public synchronized void + setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + @Private public static final String ROOT_QUEUE = CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3ff3b04..63ac11d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -123,7 +123,6 @@ @SuppressWarnings("unchecked") public class FairScheduler extends AbstractYarnScheduler implements ResourceScheduler { - private boolean initialized; private FairSchedulerConfiguration conf; private Resource minimumAllocation; private Resource maximumAllocation; @@ -198,8 +197,14 @@ private AllocationFileLoaderService allocsLoader; @VisibleForTesting AllocationConfiguration allocConf; + + private Thread updateThread; + private Thread schedulingThread; + private volatile boolean isUpdateThreadRunning = false; + private volatile boolean isSchedulingThreadRunning = false; public FairScheduler() { + super(FairScheduler.class.getName()); clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); @@ -275,7 +280,7 @@ private FSSchedulerApp getCurrentAttemptForContainer( */ private class UpdateThread implements Runnable { public void run() { - while (true) { + while (isSchedulingThreadRunning) { try { Thread.sleep(UPDATE_INTERVAL); update(); @@ -986,7 +991,7 @@ private synchronized void nodeUpdate(RMNode nm) { } private void continuousScheduling() { - while (true) { + while (isSchedulingThreadRunning) { List nodeIdList = new ArrayList(nodes.keySet()); Collections.sort(nodeIdList, nodeAvailableResourceComparator); @@ -1205,85 +1210,122 @@ public void recover(RMState state) throws Exception { // NOT IMPLEMENTED } - @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); - - initialized = true; - - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + @VisibleForTesting + void serviceInitInternal(Configuration conf) throws IOException { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + // This stores per-application scheduling information + this.applications = + new ConcurrentHashMap(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); + + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); + isUpdateThreadRunning = true; + updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + isSchedulingThreadRunning = true; + schedulingThread = new Thread( new Runnable() { @Override public void run() { continuousScheduling(); } } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); - } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); - } - allocsLoader.start(); - } else { - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - LOG.error("Failed to reload allocations file", e); - } + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + } + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } + + void serviceStartInternal() throws Exception { + updateThread.start(); + if (continuousSchedulingEnabled) { + schedulingThread.start(); + } + allocsLoader.start(); + } + + @Override + protected void serviceStart() throws Exception { + serviceStartInternal(); + super.serviceStart(); + } + + void serviceStopInternal() throws Exception { + isUpdateThreadRunning = false; + updateThread.interrupt(); + if (continuousSchedulingEnabled) { + isSchedulingThreadRunning = false; + schedulingThread.interrupt(); } + allocsLoader.stop(); + } + + @Override + protected void serviceStop() throws Exception { + serviceStopInternal(); + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } + } + + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 696a64c..9f16590 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -120,7 +120,6 @@ protected Map nodes = new ConcurrentHashMap(); - private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; private boolean usePortForNodeName; @@ -186,6 +185,13 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { } }; + /** + * Construct the service. + */ + public FifoScheduler() { + super(FifoScheduler.class.getName()); + } + @Override public synchronized void setConf(Configuration conf) { this.conf = conf; @@ -231,33 +237,53 @@ public Resource getMaximumResourceCapability() { return maximumAllocation; } - @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException - { - setConf(conf); - if (!this.initialized) { - validateConf(conf); - this.rmContext = rmContext; - //Use ConcurrentSkipListMap because applications need to be ordered - this.applications = - new ConcurrentSkipListMap(); - this.minimumAllocation = + @VisibleForTesting + public synchronized void serviceInitInternal(Configuration conf) { + reinitialize(conf); + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap(); + this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = + this.maximumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - this.usePortForNodeName = conf.getBoolean( - YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, - conf); - this.activeUsersManager = new ActiveUsersManager(metrics); - this.initialized = true; - } + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, + conf); + this.activeUsersManager = new ActiveUsersManager(metrics); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + serviceInitInternal(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf) { + setConf(conf); + validateConf(conf); + } + + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } private static final Allocation EMPTY_ALLOCATION = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index cb40ee3..2e438fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -66,7 +68,7 @@ private final int GB = 1024; private static YarnConfiguration conf; - + @BeforeClass public static void setup() { conf = new YarnConfiguration(); @@ -81,7 +83,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.reinitialize(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -98,6 +100,7 @@ public void test() throws Exception { rootLogger.setLevel(Level.DEBUG); MockRM rm = new MockRM(conf); rm.start(); + while(!rm.isInState(Service.STATE.STARTED)); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); @@ -234,7 +237,7 @@ public void testReconnectedNode() throws Exception { conf.setQueues("default", new String[] {"default"}); conf.setCapacity("default", 100); FifoScheduler fs = new FifoScheduler(); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); @@ -441,7 +444,6 @@ public void testHeadroom() throws Exception { rm.stop(); } - public static void main(String[] args) throws Exception { TestFifoScheduler t = new TestFifoScheduler(); t.test(); @@ -449,4 +451,5 @@ public static void main(String[] args) throws Exception { t.testNonDefaultMinimumAllocation(); t.testReconnectedNode(); } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 08efe29..e8f8d06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -27,12 +27,9 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -55,16 +52,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -133,7 +126,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.reinitialize(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -147,7 +140,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, null); + scheduler.reinitialize(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -345,15 +338,15 @@ public void testRefreshQueues() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); - cs.reinitialize(conf,null); + cs.reinitialize(conf); checkQueueCapacities(cs, 80f, 20f); } @@ -444,10 +437,11 @@ public void testParseQueue() throws IOException { conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); + cs.reinitialize(conf); } @Test @@ -457,10 +451,11 @@ public void testReconnectedNode() throws Exception { setupQueueConfiguration(csConf); CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, - null, null, new RMContainerTokenSecretManager(csConf), - new NMTokenSecretManagerInRM(csConf), - new ClientToAMTokenSecretManagerInRM())); + cs.setRMContext(new RMContextImpl(null, null, null, null, + null, null, new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM())); + cs.reinitialize(csConf); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); @@ -484,10 +479,11 @@ public void testRefreshQueuesWithNewQueue() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + cs.setRMContext(new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); + cs.reinitialize(conf); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); // Add a new queue b4 @@ -503,7 +499,7 @@ public void testRefreshQueuesWithNewQueue() throws Exception { conf.setCapacity(B2, B2_CAPACITY); conf.setCapacity(B3, B3_CAPACITY); conf.setCapacity(B4, B4_CAPACITY); - cs.reinitialize(conf,null); + cs.reinitialize(conf); checkQueueCapacities(cs, 80f, 20f); // Verify parent for B4 @@ -635,10 +631,11 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); - cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, - null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); + cs.setRMContext(new RMContextImpl(rmDispatcher, null, null, null, + null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); + cs.reinitialize(conf); SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 5d91e8f..c416bea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -139,7 +139,8 @@ public void setUp() throws Exception { queues, queues, TestUtils.spyHook); - cs.reinitialize(csConf, rmContext); + cs.serviceInit(csConf); + cs.setRMContext(rmContext); } private static final String A = "a"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index c86d6b3..9284bff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -44,11 +44,12 @@ public void testQueueParsing() throws Exception { CapacityScheduler capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(conf); - capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, - null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM())); - + capacityScheduler.setRMContext(new RMContextImpl(null, null, + null, null, null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM())); + capacityScheduler.serviceInitInternal(conf); + CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA); @@ -142,7 +143,7 @@ public void testRootQueueParsing() throws Exception { CapacityScheduler capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); + capacityScheduler.serviceInitInternal(conf); } public void testMaxCapacity() throws Exception { @@ -164,7 +165,7 @@ public void testMaxCapacity() throws Exception { try { capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); + capacityScheduler.serviceInitInternal(conf); } catch (IllegalArgumentException iae) { fail = true; } @@ -176,17 +177,17 @@ public void testMaxCapacity() throws Exception { // Now this should work capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); - capacityScheduler.reinitialize(conf, null); - + capacityScheduler.serviceInitInternal(conf); + fail = false; try { - LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A); - a.setMaxCapacity(45); + LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A); + a.setMaxCapacity(45); } catch (IllegalArgumentException iae) { fail = true; } Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " + - "setMaxCap", fail); + "setMaxCap", fail); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index e4dc801..1d0d95f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -40,7 +40,7 @@ private Resource maxResource = Resources.createResource(10); @Before - public void setup() throws IOException { + public void setup() throws Exception { FairScheduler scheduler = new FairScheduler(); Configuration conf = createConfiguration(); // All tests assume only one assignment per node update @@ -48,7 +48,8 @@ public void setup() throws IOException { ResourceManager resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.setRMContext(resourceManager.getRMContext()); String queueName = "root.queue1"; scheduler.allocConf = mock(AllocationConfiguration.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b251ce7..bae67ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -128,7 +128,7 @@ public void tick(int seconds) { // HELPER METHODS @Before - public void setUp() throws IOException { + public void setUp() throws Exception { scheduler = new FairScheduler(); conf = createConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); @@ -161,11 +161,12 @@ public void tearDown() { @Test (timeout = 30000) public void testConfValidation() throws Exception { ResourceScheduler scheduler = new FairScheduler(); + FairScheduler fs = (FairScheduler) scheduler; Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + fs.serviceInitInternal(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -179,7 +180,7 @@ public void testConfValidation() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, null); + fs.serviceInitInternal(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -301,9 +302,9 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); + true); conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, - 10); + 10); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 5000); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, @@ -312,7 +313,8 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); Assert.assertEquals(true, scheduler.sizeBasedWeight); @@ -338,8 +340,8 @@ public void testNonMinZeroResourcesSettings() throws IOException { conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.serviceInitInternal(conf); Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); @@ -356,7 +358,7 @@ public void testMinZeroResourcesSettings() throws IOException { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); @@ -365,7 +367,8 @@ public void testMinZeroResourcesSettings() throws IOException { @Test public void testAggregateCapacityTracking() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -390,7 +393,8 @@ public void testAggregateCapacityTracking() throws Exception { @Test public void testSimpleFairShareCalculation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -417,7 +421,8 @@ public void testSimpleFairShareCalculation() throws IOException { @Test public void testSimpleHierarchicalFairShareCalculation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) int capacity = 10 * 24; @@ -450,7 +455,8 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException { @Test public void testHierarchicalQueuesSimilarParents() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child", true); @@ -474,7 +480,8 @@ public void testHierarchicalQueuesSimilarParents() throws IOException { @Test public void testSchedulerRootQueueMetrics() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -513,7 +520,8 @@ public void testSchedulerRootQueueMetrics() throws Exception { @Test (timeout = 5000) public void testSimpleContainerAllocation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -561,7 +569,8 @@ public void testSimpleContainerAllocation() throws IOException { @Test (timeout = 5000) public void testSimpleContainerReservation() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -615,7 +624,8 @@ public void testSimpleContainerReservation() throws Exception { @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); @@ -641,7 +651,8 @@ public void testUserAsDefaultQueue() throws Exception { @Test public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); @@ -667,7 +678,8 @@ public void testNotUserAsDefaultQueue() throws Exception { @Test public void testEmptyQueueName() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // only default queue assertEquals(1, scheduler.getQueueManager().getLeafQueues().size()); @@ -687,8 +699,9 @@ public void testEmptyQueueName() throws Exception { @Test public void testAssignToQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); @@ -706,7 +719,8 @@ public void testAssignToQueue() throws Exception { public void testQueuePlacementWithPolicy() throws Exception { conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, SimpleGroupsMapping.class, GroupMappingServiceProvider.class); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId appId; @@ -764,7 +778,8 @@ public void testFairShareWithMinAlloc() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -796,7 +811,8 @@ else if (p.getName().equals("root.queueB")) { */ @Test public void testQueueDemandCalculation() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId id11 = createAppAttemptId(1, 1); scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); @@ -846,7 +862,8 @@ public void testQueueDemandCalculation() throws Exception { @Test public void testAppAdditionAndRemoval() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId attemptId =createAppAttemptId(1, 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", "user1"); @@ -896,7 +913,8 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); Collection leafQueues = queueManager.getLeafQueues(); @@ -929,7 +947,8 @@ public void testConfigureRootQueue() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSQueue root = queueManager.getRootQueue(); @@ -955,7 +974,8 @@ public void testIsStarvedForMinShare() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add one big node (only care about aggregate capacity) RMNode node1 = @@ -1012,8 +1032,9 @@ public void testIsStarvedForFairShare() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, @@ -1085,7 +1106,8 @@ public void testChoiceOfPreemptedContainers() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Create four nodes RMNode node1 = @@ -1252,7 +1274,8 @@ public void testPreemptionDecision() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Create four nodes RMNode node1 = @@ -1349,7 +1372,8 @@ public void testPreemptionDecision() throws Exception { @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -1391,8 +1415,9 @@ public void testUserMaxRunningApps() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // Add a node RMNode node1 = MockNodes @@ -1432,7 +1457,8 @@ public void testUserMaxRunningApps() throws Exception { @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); // Add a node RMNode node1 = @@ -1512,8 +1538,9 @@ public void testAclSubmitApplication() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname", 1); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", @@ -1527,7 +1554,8 @@ public void testAclSubmitApplication() throws Exception { @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1576,7 +1604,8 @@ public void testMultipleNodesSingleRackRequest() throws Exception { @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1620,7 +1649,8 @@ public void testFifoWithinQueue() throws Exception { @Test(timeout = 3000) public void testMaxAssign() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, @@ -1663,7 +1693,8 @@ public void testMaxAssign() throws Exception { */ @Test(timeout = 5000) public void testAssignContainer() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); final String user = "user1"; final String fifoQueue = "fifo"; @@ -1747,8 +1778,9 @@ public void testNotAllowSubmitApplication() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + int appId = this.APP_ID++; String user = "usernotallow"; String queue = "queue1"; @@ -1797,7 +1829,8 @@ public void testNotAllowSubmitApplication() throws Exception { @Test public void testReservationThatDoesntFit() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes @@ -1825,7 +1858,8 @@ public void testReservationThatDoesntFit() throws IOException { @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); @@ -1853,7 +1887,8 @@ public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { @Test public void testStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -1893,7 +1928,8 @@ public void testStrictLocality() throws IOException { @Test public void testCancelStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -1943,7 +1979,8 @@ public void testCancelStrictLocality() throws IOException { */ @Test public void testReservationsStrictLocality() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2"); @@ -1983,7 +2020,8 @@ public void testReservationsStrictLocality() throws IOException { @Test public void testNoMoreCpuOnNode() throws IOException { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), 1, "127.0.0.1"); @@ -2004,7 +2042,8 @@ public void testNoMoreCpuOnNode() throws IOException { @Test public void testBasicDRFAssignment() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); @@ -2044,7 +2083,8 @@ public void testBasicDRFAssignment() throws Exception { */ @Test public void testBasicDRFWithQueues() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), 1, "127.0.0.1"); @@ -2080,7 +2120,8 @@ public void testBasicDRFWithQueues() throws Exception { @Test public void testDRFHierarchicalQueues() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), 1, "127.0.0.1"); @@ -2148,8 +2189,8 @@ public void testDRFHierarchicalQueues() throws Exception { public void testHostPortNodeName() throws Exception { conf.setBoolean(YarnConfiguration .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - scheduler.reinitialize(conf, - resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1", 1); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -2229,8 +2270,9 @@ public void testUserAndQueueMaxRunningApps() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); verifyAppRunnable(attId1, true); @@ -2283,8 +2325,9 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); verifyAppRunnable(attId1, true); @@ -2343,8 +2386,10 @@ public void testContinuousScheduling() throws Exception { FairScheduler fs = new FairScheduler(); Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); - fs.reinitialize(conf, resourceManager.getRMContext()); + true); + fs.setRMContext(resourceManager.getRMContext()); + fs.serviceInit(conf); + fs.serviceStart(); Assert.assertTrue("Continuous scheduling should be enabled.", fs.isContinuousSchedulingEnabled()); @@ -2424,7 +2469,8 @@ public void testDontAllowUndeclaredPools() throws Exception{ out.println(""); out.close(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); QueueManager queueManager = scheduler.getQueueManager(); FSLeafQueue jerryQueue = queueManager.getLeafQueue("jerry", false); @@ -2454,7 +2500,8 @@ public void testDontAllowUndeclaredPools() throws Exception{ @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); final int GB = 1024; String host = "127.0.0.1"; @@ -2506,7 +2553,8 @@ public void testBlacklistNodes() throws Exception { @Test public void testGetAppsInQueue() throws Exception { - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInitInternal(conf); ApplicationAttemptId appAttId1 = createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1"); @@ -2544,6 +2592,9 @@ public void testGetAppsInQueue() throws Exception { public void testAddAndRemoveAppFromFairScheduler() throws Exception { FairScheduler scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.serviceStart(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( scheduler.getSchedulerApplications(), scheduler, "default"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java index 633bca4..24db09c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java @@ -38,7 +38,7 @@ private ResourceManager resourceManager; @Before - public void setUp() throws IOException { + public void setUp() throws Exception { scheduler = new FairScheduler(); Configuration conf = new YarnConfiguration(); @@ -51,7 +51,8 @@ public void setUp() throws IOException { resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); + scheduler.serviceInit(conf); + scheduler.setRMContext(resourceManager.getRMContext()); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index af819d1..e17a703 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -57,13 +57,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -91,7 +88,7 @@ public void setUp() throws Exception { resourceManager = new ResourceManager(); Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); resourceManager.init(conf); } @@ -144,7 +141,9 @@ public void testAppAttemptMetrics() throws Exception { null, null, null, null, null, null, null); FifoScheduler schedular = new FifoScheduler(); - schedular.reinitialize(new Configuration(), rmContext); + Configuration conf = new Configuration(); + schedular.setRMContext(rmContext); + schedular.serviceInit(conf); QueueMetrics metrics = schedular.getRootQueueMetrics(); int beforeAppsSubmitted = metrics.getAppsSubmitted(); @@ -181,7 +180,8 @@ public void testNodeLocalAssignment() throws Exception { null, containerTokenSecretManager, nmTokenSecretManager, null); FifoScheduler scheduler = new FifoScheduler(); - scheduler.reinitialize(new Configuration(), rmContext); + scheduler.serviceInit(new Configuration()); + scheduler.setRMContext(rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(1024 * 64), 1, "127.0.0.1"); @@ -250,7 +250,8 @@ public void testUpdateResourceOnNode() throws Exception { return nodes; } }; - scheduler.reinitialize(new Configuration(), rmContext); + scheduler.serviceInit(new Configuration()); + scheduler.setRMContext(rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); @@ -537,6 +538,8 @@ public void testBlackListNodes() throws Exception { MockRM rm = new MockRM(conf); rm.start(); FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + fs.setRMContext(rm.getRMContext()); + fs.serviceInitInternal(conf); String host = "127.0.0.1"; RMNode node = @@ -568,14 +571,15 @@ public void testBlackListNodes() throws Exception { @Test public void testGetAppsInQueue() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); + Application application_0 = new Application("user_0", resourceManager); application_0.submit(); Application application_1 = new Application("user_0", resourceManager); application_1.submit(); - - ResourceScheduler scheduler = resourceManager.getResourceScheduler(); - + List appsInDefault = scheduler.getAppsInQueue("default"); assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); @@ -591,6 +595,8 @@ public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { ResourceScheduler.class); MockRM rm = new MockRM(conf); FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); + fs.setRMContext(rm.getRMContext()); + fs.serviceInitInternal(conf); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( fs.getSchedulerApplications(), fs, "queue"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 74dc95a..42ffc5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -203,10 +203,11 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException { CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, + cs.setRMContext(new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM())); + cs.serviceInitInternal(conf); return cs; } @@ -281,7 +282,7 @@ public static FifoScheduler mockFifoScheduler() throws Exception { FifoScheduler fs = new FifoScheduler(); fs.setConf(new YarnConfiguration()); - fs.reinitialize(conf, null); + fs.serviceInitInternal(conf); return fs; }