diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java index 2bc0407..361e520 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java @@ -41,7 +41,13 @@ * Stopped - Not accepting submissions of new applications. */ STOPPED, - + + /** + * Stop_running - Not accepting submissions of new applications, + * and waiting for applications finish + */ + STOP_RUNNING, + /** * Running - normal operation. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java new file mode 100644 index 0000000..ce2f4d4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java @@ -0,0 +1,112 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; + +@SuppressWarnings("rawtypes") +@Private +@Unstable +public class QueueStateManager { + private static final Log LOG = LogFactory.getLog(QueueStateManager.class); + + private SchedulerQueueContext queueContext; + + public void initialize(SchedulerQueueContext newQueueContext) { + this.queueContext = newQueueContext; + } + + @SuppressWarnings("unchecked") + public boolean stopQueue(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + QueueState currentState = queue.getState(); + if (currentState != QueueState.RUNNING) { + LOG.info("We can not stop a non-running queue:" + queueName + + ". The current state is " + queue.getState()); + return false; + } else { + if (queue.haveActiveApps()) { + queue.updateQueueState(QueueState.STOP_RUNNING); + } else { + queue.updateQueueState(QueueState.STOPPED); + } + if (queue.getChildQueues() != null) { + for(T child : queue.getChildQueues()) { + stopQueue(child.getQueueName()); + } + } + return true; + } + } + + @SuppressWarnings("unchecked") + public boolean activateQueue(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.RUNNING) { + LOG.info("The specified queue:" + queueName + + " is already in the RUNNING state."); + return true; + } else { + T parent = queue.getParent(); + if (parent == null || parent.getState() == QueueState.RUNNING) { + queue.updateQueueState(QueueState.RUNNING); + return true; + } + LOG.info("The parent Queue:" + parent.getQueueName() + " is not running." + + " Please activate the parent queue first"); + return false; + } + } + + @SuppressWarnings("unchecked") + public void appFinished(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue.getState() == QueueState.STOP_RUNNING) { + if (!queue.haveActiveApps()) { + queue.updateQueueState(QueueState.STOPPED); + } + } + } + + @SuppressWarnings("unchecked") + public boolean canDelete(String queueName) { + SchedulerQueue queue = queueContext.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.STOPPED){ + return true; + } + LOG.info("Need to stop the specific queue:" + queueName + " first."); + return false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java new file mode 100644 index 0000000..fc18187 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.List; +import org.apache.hadoop.yarn.api.records.QueueState; + +@SuppressWarnings("rawtypes") +public interface SchedulerQueue extends Queue { + + List getChildQueues(); + + T getParent(); + + QueueState getState(); + + void updateQueueState(QueueState state); + + boolean haveActiveApps(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java index e091e32..8d16cd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueContext.java @@ -24,9 +24,10 @@ /** * Context of the Queues in Scheduler. */ +@SuppressWarnings("rawtypes") @Unstable @Private -public interface SchedulerQueueContext { +public interface SchedulerQueueContext { T getRootQueue(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 096f5ea..23c0287 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -738,4 +738,14 @@ public Resource getTotalKillableResource(String partition) { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + @Override + public void updateQueueState(QueueState queueState) { + try { + this.writeLock.lock(); + this.state = queueState; + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index daf7790..10c7bb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -51,8 +52,7 @@ */ @Stable @Private -public interface CSQueue -extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { +public interface CSQueue extends SchedulerQueue { /** * Get the parent Queue. * @return the parent queue diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1f17a21..2cf76f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -95,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -146,6 +148,7 @@ private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private CapacitySchedulerQueueContext queueContext; + private QueueStateManager queueStateManager; // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -290,6 +293,7 @@ private void initScheduler(Configuration configuration) throws this.queueContext.setCapacitySchedulerContext(this); this.queueContext.setYarnAuthorizationProvider( YarnAuthorizationProvider.getInstance(yarnConf)); + this.queueStateManager = new QueueStateManager(); this.activitiesManager = new ActivitiesManager(rmContext); activitiesManager.init(conf); initializeQueues(this.conf); @@ -505,6 +509,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { this.queueContext.initializeQueues(conf); + this.queueStateManager.initialize(this.queueContext); labelManager.reinitializeQueueLabels(getQueueToLabels()); updatePlacementRules(); @@ -517,6 +522,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration newConf) throws IOException { // Parse new queues this.queueContext.reinitializeQueues(newConf); + this.queueStateManager.initialize(this.queueContext); updatePlacementRules(); labelManager.reinitializeQueueLabels(getQueueToLabels()); @@ -2078,4 +2084,10 @@ public PreemptionManager getPreemptionManager() { public ResourceUsage getClusterResourceUsage() { return getRootQueue().getQueueResourceUsage(); } + + @Private + @VisibleForTesting + public QueueStateManager getQueueStateManager() { + return this.queueStateManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3c51961..60ccd78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2284,4 +2284,9 @@ public Resource getClusterResource() { return clusterResource; } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a69af6e..d83d890 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1062,4 +1062,9 @@ private void killContainersToEnforceMaxQueueCapacity(String partition, } } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java new file mode 100644 index 0000000..d773d34 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -0,0 +1,112 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueueStateManager { + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + private static final String Q3 = "q3"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + Q1_PATH + "." + Q2; + private final static String Q3_PATH = + Q1_PATH + "." + Q3; + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Test + public void testQueueStateManager() throws AccessControlException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 50); + csConf.setCapacity(Q3_PATH, 50); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + //by default, the state of both queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // Stop Q2, and verify that Q2 transmits to STOPPED STATE + cs.getQueueStateManager().stopQueue(Q2); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Stop Q1, and verify that Q1, as well as its child: Q3, + // transmits to STOPPED STATE + cs.getQueueStateManager().stopQueue(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + Assert.assertTrue(cs.getQueueStateManager().canDelete(Q1)); + Assert.assertTrue(cs.getQueueStateManager().canDelete(Q2)); + Assert.assertTrue(cs.getQueueStateManager().canDelete(Q3)); + + // Active Q2, it will fail. + Assert.assertFalse(cs.getQueueStateManager().activateQueue(Q2)); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Now active Q1 + Assert.assertTrue(cs.getQueueStateManager().activateQueue(Q1)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + + // Now active Q2 and Q3 + Assert.assertTrue(cs.getQueueStateManager().activateQueue(Q2)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertTrue(cs.getQueueStateManager().activateQueue(Q3)); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + Assert.assertFalse(cs.getQueueStateManager().canDelete(Q1)); + Assert.assertFalse(cs.getQueueStateManager().canDelete(Q2)); + Assert.assertFalse(cs.getQueueStateManager().canDelete(Q3)); + + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String userName = "testUser"; + cs.getQueue(Q2).submitApplication(appId, userName, Q2); + + cs.getQueueStateManager().stopQueue(Q1); + + Assert.assertEquals(QueueState.STOP_RUNNING, cs.getQueue(Q1).getState()); + + cs.getQueue(Q2).finishApplication(appId, userName); + cs.getQueueStateManager().appFinished(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + } +}