diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
index 2bc0407..acca23b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
@@ -29,6 +29,10 @@
*
* - {@link #RUNNING} - normal state.
* - {@link #STOPPED} - not accepting new application submissions.
+ * -
+ * {@link #DRAINING} - not accepting new application submissions
+ * and waiting for applications finish.
+ *
*
*
* @see QueueInfo
@@ -41,7 +45,12 @@
* Stopped - Not accepting submissions of new applications.
*/
STOPPED,
-
+
+ /**
+ * Draining - Not accepting submissions of new applications,
+ * and waiting for applications finish
+ */
+ DRAINING,
/**
* Running - normal operation.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
new file mode 100644
index 0000000..8455c85
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+
+/**
+ *
+ * QueueStateManager who would manage the queue state.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@Private
+@Unstable
+public class QueueStateManager {
+
+ private static final Log LOG = LogFactory.getLog(QueueStateManager.class);
+
+ private SchedulerQueueManager queueContext;
+
+ //TODO: YARN-5755. Better handling STOP queue.
+ //TODO: YARN-5724. Better handling Delete queue.
+ public void initialize(SchedulerQueueManager newQueueManager) {
+ this.queueContext = newQueueManager;
+ }
+
+ /**
+ * Stop the queue.
+ * @param queueName the queue name
+ * @return true if the queue can be stop.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized boolean stopQueue(String queueName) {
+ SchedulerQueue queue = queueContext.getQueue(queueName);
+ if (queue == null) {
+ LOG.info("The specified queue:" + queueName + " does not exist!");
+ return false;
+ }
+ QueueState currentState = queue.getState();
+ if (currentState != QueueState.RUNNING) {
+ LOG.info("We can not stop a non-running queue:" + queueName
+ + ". The current state is " + queue.getState());
+ return false;
+ } else {
+ if (queue.haveActiveApps()) {
+ queue.updateQueueState(QueueState.DRAINING);
+ } else {
+ queue.updateQueueState(QueueState.STOPPED);
+ }
+ if (queue.getChildQueues() != null) {
+ for(T child : queue.getChildQueues()) {
+ stopQueue(child.getQueueName());
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Active the queue.
+ * @param queueName the queue name
+ * @return true if the queue can be activated
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized boolean activateQueue(String queueName) {
+ SchedulerQueue queue = queueContext.getQueue(queueName);
+ if (queue == null) {
+ LOG.info("The specified queue:" + queueName + " does not exist!");
+ return false;
+ }
+ if (queue.getState() == QueueState.RUNNING) {
+ LOG.info("The specified queue:" + queueName
+ + " is already in the RUNNING state.");
+ return true;
+ } else {
+ T parent = queue.getParent();
+ if (parent == null || parent.getState() == QueueState.RUNNING) {
+ queue.updateQueueState(QueueState.RUNNING);
+ return true;
+ }
+ LOG.info("The parent Queue:" + parent.getQueueName() + " is not running."
+ + " Please activate the parent queue first");
+ return false;
+ }
+ }
+
+ /**
+ * Triggered if any applications finish.
+ * @param queueName the queue name
+ */
+ @SuppressWarnings("unchecked")
+ public void appFinished(String queueName) {
+ SchedulerQueue queue = queueContext.getQueue(queueName);
+ if (queue.getState() == QueueState.DRAINING) {
+ if (!queue.haveActiveApps()) {
+ queue.updateQueueState(QueueState.STOPPED);
+ }
+ }
+ }
+
+ /**
+ * Whether this queue can be deleted.
+ * @param queueName the queue name
+ * @return true if the queue can be deleted
+ */
+ @SuppressWarnings("unchecked")
+ public boolean canDelete(String queueName) {
+ SchedulerQueue queue = queueContext.getQueue(queueName);
+ if (queue == null) {
+ LOG.info("The specified queue:" + queueName + " does not exist!");
+ return false;
+ }
+ if (queue.getState() == QueueState.STOPPED){
+ return true;
+ }
+ LOG.info("Need to stop the specific queue:" + queueName + " first.");
+ return false;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
new file mode 100644
index 0000000..f45548c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.yarn.api.records.QueueState;
+
+/**
+ *
+ * Represents a queue in Scheduler.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@LimitedPrivate("yarn")
+public interface SchedulerQueue extends Queue {
+
+ /**
+ * Get list of child queues.
+ * @return a list of child queues
+ */
+ List getChildQueues();
+
+ /**
+ * Get the parent queue.
+ * @return the parent queue
+ */
+ T getParent();
+
+ /**
+ * Get current queue state.
+ * @return the queue state
+ */
+ QueueState getState();
+
+ /**
+ * Update the queue state.
+ * @param state the queue state
+ */
+ void updateQueueState(QueueState state);
+
+ /**
+ * Whether there is any active application in this queue.
+ * @return true if any application is active in this queue
+ */
+ boolean haveActiveApps();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
index 92b989a..24797a6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
@@ -29,9 +29,10 @@
* Context of the Queues in Scheduler.
*
*/
+@SuppressWarnings("rawtypes")
@Private
@Unstable
-public interface SchedulerQueueManager {
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index dd2f0d9..d8762fc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -74,7 +74,7 @@
final Resource minimumAllocation;
volatile Resource maximumAllocation;
- volatile QueueState state;
+ private volatile QueueState state;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
@@ -105,6 +105,9 @@
protected ReentrantReadWriteLock.ReadLock readLock;
protected ReentrantReadWriteLock.WriteLock writeLock;
+ private ReentrantReadWriteLock.ReadLock queueStateReadLock;
+ private ReentrantReadWriteLock.WriteLock queueStateWriteLock;
+
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.labelManager = cs.getRMContext().getNodeLabelManager();
@@ -132,6 +135,11 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
+
+ ReentrantReadWriteLock queueStatelock = new ReentrantReadWriteLock();
+ queueStateReadLock = queueStatelock.readLock();
+ queueStateWriteLock = queueStatelock.writeLock();
+ initializeQueueState();
}
protected void setupConfigurableCapacities() {
@@ -183,7 +191,12 @@ public int getNumContainers() {
@Override
public QueueState getState() {
- return state;
+ try {
+ this.queueStateReadLock.lock();
+ return state;
+ } finally {
+ this.queueStateReadLock.unlock();
+ }
}
@Override
@@ -291,8 +304,6 @@ void setupQueueConfigs(Resource clusterResource)
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
- initializeQueueState();
-
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
// Update metrics
@@ -338,19 +349,19 @@ private void initializeQueueState() {
.getConfiguredState(getQueuePath());
QueueState parentState = parent.getState();
if (configuredState == null) {
- this.state = parentState;
+ updateQueueState(parentState);
} else if (configuredState == QueueState.RUNNING
&& parentState == QueueState.STOPPED) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueueName() + " state is STOPPED, "
+ "child queue:" + queueName + " state cannot be RUNNING.");
} else {
- this.state = configuredState;
+ updateQueueState(configuredState);
}
} else {
// if this is the root queue, get the state from the configuration.
// if the state is not set, use RUNNING as default state.
- this.state = csContext.getConfiguration().getState(getQueuePath());
+ updateQueueState(csContext.getConfiguration().getState(getQueuePath()));
}
}
@@ -364,7 +375,7 @@ protected QueueInfo getQueueInfo() {
queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
- queueInfo.setQueueState(state);
+ queueInfo.setQueueState(getState());
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
@@ -837,4 +848,14 @@ public boolean accept(Resource cluster,
return true;
}
+
+ @Override
+ public void updateQueueState(QueueState queueState) {
+ try {
+ this.queueStateWriteLock.lock();
+ this.state = queueState;
+ } finally {
+ this.queueStateWriteLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index baf60e4..bd32272 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -56,8 +57,7 @@
*/
@Stable
@Private
-public interface CSQueue
-extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
+public interface CSQueue extends SchedulerQueue {
/**
* Get the parent Queue.
* @return the parent queue
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e42b20c..d2b58aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2451,4 +2451,9 @@ public int getAsyncSchedulingPendingBacklogs() {
}
return 0;
}
+
+ @Override
+ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
+ return this.queueManager;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index c41a7bf..7d29619 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -83,4 +83,6 @@
ResourceUsage getClusterResourceUsage();
ActivitiesManager getActivitiesManager();
+
+ CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 7a6ce56..6a3c08a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
@@ -86,6 +87,9 @@ public CSQueue hook(CSQueue queue) {
private CSQueue root;
private final RMNodeLabelsManager labelManager;
+ private QueueStateManager
+ queueStateManager;
+
/**
* Construct the service.
* @param conf the configuration
@@ -95,6 +99,7 @@ public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
+ this.queueStateManager = new QueueStateManager<>();
}
@Override
@@ -142,6 +147,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf)
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
+ this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root);
}
@@ -170,6 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
+ this.queueStateManager.initialize(this);
}
/**
@@ -358,4 +365,10 @@ public Priority getDefaultPriorityForQueue(String queueName) {
}
return queueToLabels;
}
+
+ @Private
+ public QueueStateManager
+ getQueueStateManager() {
+ return this.queueStateManager;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9661206..f3054ea 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -273,7 +273,7 @@ protected void setupQueueConfigs(Resource clusterResource)
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + numContainers
- + " [= currentNumContainers ]" + "\n" + "state = " + state
+ + " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+ nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
@@ -866,6 +866,11 @@ private void addApplicationAttempt(FiCaSchedulerApp application,
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application);
+
+ // Inform the queueStateManager
+ this.csContext.getCapacitySchedulerQueueManager().getQueueStateManager()
+ .appFinished(queueName);
+
// Inform the parent queue
getParent().finishApplication(application, user);
}
@@ -2412,4 +2417,9 @@ public Resource getClusterResource() {
return clusterResource;
}
}
+
+ @Override
+ public boolean haveActiveApps() {
+ return getNumApplications() > 0;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index fd0c68b..88a9c7f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -133,7 +133,7 @@ void setupQueueConfigs(Resource clusterResource)
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
- .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+ .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally {
@@ -345,7 +345,7 @@ public void submitApplication(ApplicationId applicationId, String user,
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
- if (state != QueueState.RUNNING) {
+ if (getState() != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
@@ -402,7 +402,11 @@ private void addApplication(ApplicationId applicationId,
public void finishApplication(ApplicationId application, String user) {
removeApplication(application, user);
-
+
+ // Inform the queueStateManager
+ this.csContext.getCapacitySchedulerQueueManager().getQueueStateManager()
+ .appFinished(queueName);
+
// Inform the parent queue
if (parent != null) {
parent.finishApplication(application, user);
@@ -1040,4 +1044,9 @@ public void apply(Resource cluster,
parent.apply(cluster, request);
}
}
+
+ @Override
+ public boolean haveActiveApps() {
+ return getNumApplications() > 0;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 2ce5fcb..41a603a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
@@ -953,6 +955,7 @@ public void testUserLimits() throws Exception {
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testComputeUserLimitAndSetHeadroom() throws IOException {
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
@@ -974,6 +977,15 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
+ = mock(CapacitySchedulerQueueManager.class);
+ QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
+ doNothing().when(mockQueueStateManager).appFinished(anyString());
+ when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
+ mockQueueStateManager);
+ when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
+ mockCapacitySchedulerQueueManager);
+
//our test plan contains three cases
//1. single user dominate the queue, we test the headroom
//2. two users, but user_0 is assigned 100% of the queue resource,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
new file mode 100644
index 0000000..b704987
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test QueueStateManager.
+ *
+ */
+public class TestQueueStateManager {
+ private static final String Q1 = "q1";
+ private static final String Q2 = "q2";
+ private static final String Q3 = "q3";
+
+ private final static String Q1_PATH =
+ CapacitySchedulerConfiguration.ROOT + "." + Q1;
+ private final static String Q2_PATH =
+ Q1_PATH + "." + Q2;
+ private final static String Q3_PATH =
+ Q1_PATH + "." + Q3;
+ private CapacityScheduler cs;
+ private YarnConfiguration conf;
+
+ @Test
+ public void testQueueStateManager() throws AccessControlException {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
+ csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
+
+ csConf.setCapacity(Q1_PATH, 100);
+ csConf.setCapacity(Q2_PATH, 50);
+ csConf.setCapacity(Q3_PATH, 50);
+
+ conf = new YarnConfiguration(csConf);
+ cs = new CapacityScheduler();
+
+ RMContext rmContext = TestUtils.getMockRMContext();
+ cs.setConf(conf);
+ cs.setRMContext(rmContext);
+ cs.init(conf);
+
+ @SuppressWarnings("rawtypes")
+ QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager()
+ .getQueueStateManager();
+
+ //by default, the state of both queues should be RUNNING
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+ // Stop Q2, and verify that Q2 transmits to STOPPED STATE
+ stateManager.stopQueue(Q2);
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+ // Stop Q1, and verify that Q1, as well as its child: Q3,
+ // transmits to STOPPED STATE
+ stateManager.stopQueue(Q1);
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+ Assert.assertTrue(stateManager.canDelete(Q1));
+ Assert.assertTrue(stateManager.canDelete(Q2));
+ Assert.assertTrue(stateManager.canDelete(Q3));
+
+ // Active Q2, it will fail.
+ Assert.assertFalse(stateManager.activateQueue(Q2));
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+ // Now active Q1
+ Assert.assertTrue(stateManager.activateQueue(Q1));
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+
+ // Now active Q2 and Q3
+ Assert.assertTrue(stateManager.activateQueue(Q2));
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+ Assert.assertTrue(stateManager.activateQueue(Q3));
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+ Assert.assertFalse(stateManager.canDelete(Q1));
+ Assert.assertFalse(stateManager.canDelete(Q2));
+ Assert.assertFalse(stateManager.canDelete(Q3));
+
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ String userName = "testUser";
+ cs.getQueue(Q2).submitApplication(appId, userName, Q2);
+ FiCaSchedulerApp app = getMockApplication(appId, userName,
+ Resources.createResource(4, 0));
+ cs.getQueue(Q2).submitApplicationAttempt(app, userName);
+ stateManager.stopQueue(Q1);
+
+ Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+ cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
+ cs.getQueue(Q2).finishApplication(appId, userName);
+ //stateManager.appFinished(Q1);
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+ }
+
+ private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
+ Resource amResource) {
+ FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ doReturn(applicationAttemptId.getApplicationId()).
+ when(application).getApplicationId();
+ doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
+ doReturn(user).when(application).getUser();
+ doReturn(amResource).when(application).getAMResource();
+ doReturn(Priority.newInstance(0)).when(application).getPriority();
+ doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
+ .getAppAMNodePartitionName();
+ doReturn(amResource).when(application).getAMResource(
+ CommonNodeLabelsManager.NO_LABEL);
+ when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
+ .thenCallRealMethod();
+ return application;
+ }
+}