diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java index 2bc0407..acca23b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java @@ -29,6 +29,10 @@ * * * @see QueueInfo @@ -41,7 +45,12 @@ * Stopped - Not accepting submissions of new applications. */ STOPPED, - + + /** + * Draining - Not accepting submissions of new applications, + * and waiting for applications finish + */ + DRAINING, /** * Running - normal operation. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java new file mode 100644 index 0000000..bf1e3e0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + +/** + * + * QueueStateManager who would manage the queue state. + * + */ +@SuppressWarnings("rawtypes") +@Private +@Unstable +public class QueueStateManager { + + private static final Log LOG = LogFactory.getLog(QueueStateManager.class); + + private SchedulerQueueManager queueContext; + + public void initialize(SchedulerQueueManager newQueueManager) { + this.queueContext = newQueueManager; + } + + /** + * Stop the queue. + * @param queueName the queue name + * @return true if the queue can be stop. + */ + @SuppressWarnings("unchecked") + public synchronized boolean stopQueue(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + QueueState currentState = queue.getState(); + if (currentState != QueueState.RUNNING) { + LOG.info("We can not stop a non-running queue:" + queueName + + ". The current state is " + queue.getState()); + return false; + } else { + if (queue.haveActiveApps()) { + queue.updateQueueState(QueueState.DRAINING); + } else { + queue.updateQueueState(QueueState.STOPPED); + } + if (queue.getChildQueues() != null) { + for(T child : queue.getChildQueues()) { + stopQueue(child.getQueueName()); + } + } + return true; + } + } + + /** + * Active the queue. + * @param queueName the queue name + * @return true if the queue can be activated + */ + @SuppressWarnings("unchecked") + public synchronized boolean activateQueue(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.RUNNING) { + LOG.info("The specified queue:" + queueName + + " is already in the RUNNING state."); + return true; + } else { + T parent = queue.getParent(); + if (parent == null || parent.getState() == QueueState.RUNNING) { + queue.updateQueueState(QueueState.RUNNING); + return true; + } + LOG.info("The parent Queue:" + parent.getQueueName() + " is not running." + + " Please activate the parent queue first"); + return false; + } + } + + /** + * Triggered if any applications finish. + * @param queueName the queue name + */ + @SuppressWarnings("unchecked") + public void appFinished(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue.getState() == QueueState.DRAINING) { + if (!queue.haveActiveApps()) { + queue.updateQueueState(QueueState.STOPPED); + } + } + } + + /** + * Whether this queue can be deleted. + * @param queueName the queue name + * @return true if the queue can be deleted + */ + @SuppressWarnings("unchecked") + public boolean canDelete(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.STOPPED){ + return true; + } + LOG.info("Need to stop the specific queue:" + queueName + " first."); + return false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java new file mode 100644 index 0000000..f45548c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.yarn.api.records.QueueState; + +/** + * + * Represents a queue in Scheduler. + * + */ +@SuppressWarnings("rawtypes") +@LimitedPrivate("yarn") +public interface SchedulerQueue extends Queue { + + /** + * Get list of child queues. + * @return a list of child queues + */ + List getChildQueues(); + + /** + * Get the parent queue. + * @return the parent queue + */ + T getParent(); + + /** + * Get current queue state. + * @return the queue state + */ + QueueState getState(); + + /** + * Update the queue state. + * @param state the queue state + */ + void updateQueueState(QueueState state); + + /** + * Whether there is any active application in this queue. + * @return true if any application is active in this queue + */ + boolean haveActiveApps(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java index 92b989a..24797a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java @@ -29,9 +29,10 @@ * Context of the Queues in Scheduler. * */ +@SuppressWarnings("rawtypes") @Private @Unstable -public interface SchedulerQueueManager { /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3daabaf..832bae8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -74,7 +74,7 @@ final Resource minimumAllocation; volatile Resource maximumAllocation; - volatile QueueState state; + private volatile QueueState state; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; @@ -105,6 +105,9 @@ protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + protected ReentrantReadWriteLock.ReadLock queueStateReadLock; + protected ReentrantReadWriteLock.WriteLock queueStateWriteLock; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -132,6 +135,10 @@ public AbstractCSQueue(CapacitySchedulerContext cs, ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); + + ReentrantReadWriteLock queueStatelock = new ReentrantReadWriteLock(); + queueStateReadLock = queueStatelock.readLock(); + queueStateWriteLock = queueStatelock.writeLock(); } protected void setupConfigurableCapacities() { @@ -183,7 +190,12 @@ public int getNumContainers() { @Override public QueueState getState() { - return state; + try { + this.queueStateReadLock.lock(); + return state; + } finally { + this.queueStateReadLock.unlock(); + } } @Override @@ -291,7 +303,7 @@ void setupQueueConfigs(Resource clusterResource) authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - this.state = csContext.getConfiguration().getState(getQueuePath()); + updateQueueState(csContext.getConfiguration().getState(getQueuePath())); this.acls = csContext.getConfiguration().getAcls(getQueuePath()); // Update metrics @@ -340,7 +352,7 @@ protected QueueInfo getQueueInfo() { queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setCapacity(queueCapacities.getCapacity()); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); - queueInfo.setQueueState(state); + queueInfo.setQueueState(getState()); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setQueueStatistics(getQueueStatistics()); @@ -813,4 +825,14 @@ public boolean accept(Resource cluster, return true; } + + @Override + public void updateQueueState(QueueState queueState) { + try { + this.queueStateWriteLock.lock(); + this.state = queueState; + } finally { + this.queueStateWriteLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index baf60e4..bd32272 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -56,8 +57,7 @@ */ @Stable @Private -public interface CSQueue -extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { +public interface CSQueue extends SchedulerQueue { /** * Get the parent Queue. * @return the parent queue diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e42b20c..eb1a0cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -2451,4 +2452,9 @@ public int getAsyncSchedulingPendingBacklogs() { } return 0; } + + @Override + public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { + return this.queueManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index c41a7bf..7d29619 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -83,4 +83,6 @@ ResourceUsage getClusterResourceUsage(); ActivitiesManager getActivitiesManager(); + + CapacitySchedulerQueueManager getCapacitySchedulerQueueManager(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7a6ce56..6a3c08a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; @@ -86,6 +87,9 @@ public CSQueue hook(CSQueue queue) { private CSQueue root; private final RMNodeLabelsManager labelManager; + private QueueStateManager + queueStateManager; + /** * Construct the service. * @param conf the configuration @@ -95,6 +99,7 @@ public CapacitySchedulerQueueManager(Configuration conf, RMNodeLabelsManager labelManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; + this.queueStateManager = new QueueStateManager<>(); } @Override @@ -142,6 +147,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); LOG.info("Initialized root queue " + root); } @@ -170,6 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) clusterResource)); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); } /** @@ -358,4 +365,10 @@ public Priority getDefaultPriorityForQueue(String queueName) { } return queueToLabels; } + + @Private + public QueueStateManager + getQueueStateManager() { + return this.queueStateManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9661206..f3054ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -273,7 +273,7 @@ protected void setupQueueConfigs(Resource clusterResource) + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + "numContainers = " + numContainers - + " [= currentNumContainers ]" + "\n" + "state = " + state + + " [= currentNumContainers ]" + "\n" + "state = " + getState() + " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder @@ -866,6 +866,11 @@ private void addApplicationAttempt(FiCaSchedulerApp application, public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager activeUsersManager.deactivateApplication(user, application); + + // Inform the queueStateManager + this.csContext.getCapacitySchedulerQueueManager().getQueueStateManager() + .appFinished(queueName); + // Inform the parent queue getParent().finishApplication(application, user); } @@ -2412,4 +2417,9 @@ public Resource getClusterResource() { return clusterResource; } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index fd0c68b..88a9c7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -133,7 +133,7 @@ void setupQueueConfigs(Resource clusterResource) + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities - .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { @@ -345,7 +345,7 @@ public void submitApplication(ApplicationId applicationId, String user, "Cannot submit application " + "to non-leaf queue: " + queueName); } - if (state != QueueState.RUNNING) { + if (getState() != QueueState.RUNNING) { throw new AccessControlException("Queue " + getQueuePath() + " is STOPPED. Cannot accept submission of application: " + applicationId); @@ -402,7 +402,11 @@ private void addApplication(ApplicationId applicationId, public void finishApplication(ApplicationId application, String user) { removeApplication(application, user); - + + // Inform the queueStateManager + this.csContext.getCapacitySchedulerQueueManager().getQueueStateManager() + .appFinished(queueName); + // Inform the parent queue if (parent != null) { parent.finishApplication(application, user); @@ -1040,4 +1044,9 @@ public void apply(Resource cluster, parent.apply(cluster, request); } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java new file mode 100644 index 0000000..4d12d07 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueueStateManager { + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + private static final String Q3 = "q3"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + Q1_PATH + "." + Q2; + private final static String Q3_PATH = + Q1_PATH + "." + Q3; + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Test + public void testQueueStateManager() throws AccessControlException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 50); + csConf.setCapacity(Q3_PATH, 50); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager() + .getQueueStateManager(); + + //by default, the state of both queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // Stop Q2, and verify that Q2 transmits to STOPPED STATE + stateManager.stopQueue(Q2); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Stop Q1, and verify that Q1, as well as its child: Q3, + // transmits to STOPPED STATE + stateManager.stopQueue(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + Assert.assertTrue(stateManager.canDelete(Q1)); + Assert.assertTrue(stateManager.canDelete(Q2)); + Assert.assertTrue(stateManager.canDelete(Q3)); + + // Active Q2, it will fail. + Assert.assertFalse(stateManager.activateQueue(Q2)); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Now active Q1 + Assert.assertTrue(stateManager.activateQueue(Q1)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + + // Now active Q2 and Q3 + Assert.assertTrue(stateManager.activateQueue(Q2)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertTrue(stateManager.activateQueue(Q3)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + Assert.assertFalse(stateManager.canDelete(Q1)); + Assert.assertFalse(stateManager.canDelete(Q2)); + Assert.assertFalse(stateManager.canDelete(Q3)); + + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String userName = "testUser"; + cs.getQueue(Q2).submitApplication(appId, userName, Q2); + FiCaSchedulerApp app_0 = getMockApplication(appId, userName, + Resources.createResource(4, 0)); + cs.getQueue(Q2).submitApplicationAttempt(app_0, userName); + stateManager.stopQueue(Q1); + + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + cs.getQueue(Q2).finishApplicationAttempt(app_0, Q2); + cs.getQueue(Q2).finishApplication(appId, userName); + //stateManager.appFinished(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + } + + private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, + Resource amResource) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); + doReturn(Priority.newInstance(0)).when(application).getPriority(); + doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) + .getAppAMNodePartitionName(); + doReturn(amResource).when(application).getAMResource( + CommonNodeLabelsManager.NO_LABEL); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) + .thenCallRealMethod(); + return application; + } +}