diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
index 2bc0407..acca23b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
@@ -29,6 +29,10 @@
*
* - {@link #RUNNING} - normal state.
* - {@link #STOPPED} - not accepting new application submissions.
+ * -
+ * {@link #DRAINING} - not accepting new application submissions
+ * and waiting for applications finish.
+ *
*
*
* @see QueueInfo
@@ -41,7 +45,12 @@
* Stopped - Not accepting submissions of new applications.
*/
STOPPED,
-
+
+ /**
+ * Draining - Not accepting submissions of new applications,
+ * and waiting for applications finish
+ */
+ DRAINING,
/**
* Running - normal operation.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 5a70298..a8ba740 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -427,6 +427,7 @@ message YarnClusterMetricsProto {
enum QueueStateProto {
Q_STOPPED = 1;
Q_RUNNING = 2;
+ Q_DRAINING = 3;
}
message QueueStatisticsProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
new file mode 100644
index 0000000..6e7f106
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+
+/**
+ *
+ * QueueStateManager who would manage the queue state.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@Private
+@Unstable
+public class QueueStateManager {
+
+ private static final Log LOG = LogFactory.getLog(QueueStateManager.class);
+
+ private SchedulerQueueManager queueManager;
+
+ public synchronized void initialize(SchedulerQueueManager
+ newQueueManager) {
+ this.queueManager = newQueueManager;
+ }
+
+ /**
+ * Stop the queue.
+ * @param queueName the queue name
+ * @throws YarnException if the queue does not exist
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void stopQueue(String queueName) throws YarnException {
+ SchedulerQueue queue = queueManager.getQueue(queueName);
+ if (queue == null) {
+ throw new YarnException("The specified queue:" + queueName
+ + " does not exist!");
+ }
+ queue.stopQueue();
+ }
+
+ /**
+ * Active the queue.
+ * @param queueName the queue name
+ * @throws YarnException if the queue does not exist
+ * or the queue can not be activated.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void activateQueue(String queueName)
+ throws YarnException {
+ SchedulerQueue queue = queueManager.getQueue(queueName);
+ if (queue == null) {
+ throw new YarnException("The specified queue:" + queueName
+ + " does not exist!");
+ }
+ queue.activeQueue();
+ }
+
+ /**
+ * Whether this queue can be deleted.
+ * @param queueName the queue name
+ * @return true if the queue can be deleted
+ */
+ @SuppressWarnings("unchecked")
+ public boolean canDelete(String queueName) {
+ SchedulerQueue queue = queueManager.getQueue(queueName);
+ if (queue == null) {
+ LOG.info("The specified queue:" + queueName + " does not exist!");
+ return false;
+ }
+ if (queue.getState() == QueueState.STOPPED){
+ return true;
+ }
+ LOG.info("Need to stop the specific queue:" + queueName + " first.");
+ return false;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
new file mode 100644
index 0000000..f31f606
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ *
+ * Represents a queue in Scheduler.
+ *
+ */
+@SuppressWarnings("rawtypes")
+@LimitedPrivate("yarn")
+public interface SchedulerQueue extends Queue {
+
+ /**
+ * Get list of child queues.
+ * @return a list of child queues
+ */
+ List getChildQueues();
+
+ /**
+ * Get the parent queue.
+ * @return the parent queue
+ */
+ T getParent();
+
+ /**
+ * Get current queue state.
+ * @return the queue state
+ */
+ QueueState getState();
+
+ /**
+ * Update the queue state.
+ * @param state the queue state
+ */
+ void updateQueueState(QueueState state);
+
+ /**
+ * Whether there is any active application in this queue.
+ * @return true if any application is active in this queue
+ */
+ boolean haveActiveApps();
+
+ /**
+ * Stop the queue.
+ */
+ void stopQueue();
+
+ /**
+ * Active the queue.
+ * @throws YarnException if the queue can not be activated.
+ */
+ void activeQueue() throws YarnException;
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
index 92b989a..24797a6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
@@ -29,9 +29,10 @@
* Context of the Queues in Scheduler.
*
*/
+@SuppressWarnings("rawtypes")
@Private
@Unstable
-public interface SchedulerQueueManager {
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 3372392..855ef87 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessRequest;
@@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -77,7 +79,7 @@
final Resource minimumAllocation;
volatile Resource maximumAllocation;
- volatile QueueState state;
+ private volatile QueueState state = null;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
@@ -292,9 +294,8 @@ void setupQueueConfigs(Resource clusterResource)
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());
- authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
-
initializeQueueState();
+ authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
@@ -335,25 +336,55 @@ void setupQueueConfigs(Resource clusterResource)
}
private void initializeQueueState() {
- // inherit from parent if state not set, only do this when we are not root
- if (parent != null) {
- QueueState configuredState = csContext.getConfiguration()
- .getConfiguredState(getQueuePath());
- QueueState parentState = parent.getState();
- if (configuredState == null) {
- this.state = parentState;
- } else if (configuredState == QueueState.RUNNING
- && parentState == QueueState.STOPPED) {
- throw new IllegalArgumentException(
- "The parent queue:" + parent.getQueueName() + " state is STOPPED, "
- + "child queue:" + queueName + " state cannot be RUNNING.");
+ QueueState current = getState();
+ QueueState configuredState = csContext.getConfiguration()
+ .getConfiguredState(getQueuePath());
+ // verify that we can not any value for State other than RUNNING/STOPPED
+ if (configuredState != null && configuredState != QueueState.RUNNING
+ && configuredState != QueueState.STOPPED) {
+ throw new IllegalArgumentException("Invalid queue state configuration."
+ + " We can only use RUNNING or STOPPED.");
+ }
+ // If we did not set state in configuration, use Running as default state
+ QueueState defaultState = QueueState.RUNNING;
+
+ if (current == null) {
+ // If current state of the queue is null, we would inherit the state
+ // from its parent. If this queue does not has parent, such as root queue,
+ // we would use the configured state.
+ QueueState parentState = (parent == null) ? null : parent.getState();
+ if (parentState == null) {
+ updateQueueState((configuredState == null) ? defaultState
+ : configuredState);
} else {
- this.state = configuredState;
+ if (configuredState == null) {
+ updateQueueState((parentState == QueueState.DRAINING) ?
+ QueueState.STOPPED : parentState);
+ } else if (configuredState == QueueState.RUNNING
+ && parentState != QueueState.RUNNING) {
+ throw new IllegalArgumentException(
+ "The parent queue:" + parent.getQueueName()
+ + " state is STOPPED, child queue:" + queueName
+ + " state cannot be RUNNING.");
+ } else {
+ updateQueueState(configuredState);
+ }
}
} else {
- // if this is the root queue, get the state from the configuration.
- // if the state is not set, use RUNNING as default state.
- this.state = csContext.getConfiguration().getState(getQueuePath());
+ // when we get a refreshQueue request from AdminService,
+ if (current == QueueState.RUNNING) {
+ if (configuredState == QueueState.STOPPED) {
+ stopQueue();
+ }
+ } else {
+ if (configuredState == QueueState.RUNNING) {
+ try {
+ activeQueue();
+ } catch (YarnException ex) {
+ throw new IllegalArgumentException(ex.getMessage());
+ }
+ }
+ }
}
}
@@ -367,7 +398,7 @@ protected QueueInfo getQueueInfo() {
queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
- queueInfo.setQueueState(state);
+ queueInfo.setQueueState(getState());
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
@@ -846,4 +877,47 @@ public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException {
// Dummy implementation
}
+
+ @Override
+ public void updateQueueState(QueueState queueState) {
+ this.state = queueState;
+ }
+
+ @Override
+ public void activeQueue() throws YarnException {
+ try {
+ this.writeLock.lock();
+ if (getState() == QueueState.RUNNING) {
+ LOG.info("The specified queue:" + queueName
+ + " is already in the RUNNING state.");
+ } else if (getState() == QueueState.DRAINING) {
+ throw new YarnException(
+ "The queue:" + queueName + " is in the Stopping process. "
+ + "Please wait for the queue getting fully STOPPED.");
+ } else {
+ CSQueue parent = getParent();
+ if (parent == null || parent.getState() == QueueState.RUNNING) {
+ updateQueueState(QueueState.RUNNING);
+ } else {
+ throw new YarnException("The parent Queue:" + parent.getQueueName()
+ + " is not running. Please activate the parent queue first");
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ protected void appFinished() {
+ try {
+ this.writeLock.lock();
+ if (getState() == QueueState.DRAINING) {
+ if (!haveActiveApps()) {
+ updateQueueState(QueueState.STOPPED);
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 550e206..e30ec39 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -56,8 +57,7 @@
*/
@Stable
@Private
-public interface CSQueue
-extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
+public interface CSQueue extends SchedulerQueue {
/**
* Get the parent Queue.
* @return the parent queue
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9a73a65..5463abd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2474,4 +2474,9 @@ public int getAsyncSchedulingPendingBacklogs() {
}
return 0;
}
+
+ @Override
+ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
+ return this.queueManager;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index c41a7bf..7d29619 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -83,4 +83,6 @@
ResourceUsage getClusterResourceUsage();
ActivitiesManager getActivitiesManager();
+
+ CapacitySchedulerQueueManager getCapacitySchedulerQueueManager();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 7a6ce56..6a3c08a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
@@ -86,6 +87,9 @@ public CSQueue hook(CSQueue queue) {
private CSQueue root;
private final RMNodeLabelsManager labelManager;
+ private QueueStateManager
+ queueStateManager;
+
/**
* Construct the service.
* @param conf the configuration
@@ -95,6 +99,7 @@ public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
+ this.queueStateManager = new QueueStateManager<>();
}
@Override
@@ -142,6 +147,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf)
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
+ this.queueStateManager.initialize(this);
LOG.info("Initialized root queue " + root);
}
@@ -170,6 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
+ this.queueStateManager.initialize(this);
}
/**
@@ -358,4 +365,10 @@ public Priority getDefaultPriorityForQueue(String queueName) {
}
return queueToLabels;
}
+
+ @Private
+ public QueueStateManager
+ getQueueStateManager() {
+ return this.queueStateManager;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 1c6471f..2dee296 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -273,7 +273,7 @@ protected void setupQueueConfigs(Resource clusterResource)
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+ "numContainers = " + numContainers
- + " [= currentNumContainers ]" + "\n" + "state = " + state
+ + " [= currentNumContainers ]" + "\n" + "state = " + getState()
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
+ " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+ nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
@@ -879,6 +879,9 @@ private void addApplicationAttempt(FiCaSchedulerApp application,
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application);
+
+ appFinished();
+
// Inform the parent queue
getParent().finishApplication(application, user);
}
@@ -2425,4 +2428,23 @@ public Resource getClusterResource() {
return clusterResource;
}
}
+
+ @Override
+ public boolean haveActiveApps() {
+ return getNumApplications() > 0;
+ }
+
+ @Override
+ public void stopQueue() {
+ try {
+ writeLock.lock();
+ if (haveActiveApps()) {
+ updateQueueState(QueueState.DRAINING);
+ } else {
+ updateQueueState(QueueState.STOPPED);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 0ba4ede..bdee9f5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -133,7 +133,7 @@ void setupQueueConfigs(Resource clusterResource)
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
- .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+ .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally {
@@ -369,7 +369,7 @@ public void validateSubmitApplication(ApplicationId applicationId,
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
- if (state != QueueState.RUNNING) {
+ if (getState() != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
@@ -411,7 +411,9 @@ private void addApplication(ApplicationId applicationId,
public void finishApplication(ApplicationId application, String user) {
removeApplication(application, user);
-
+
+ appFinished();
+
// Inform the parent queue
if (parent != null) {
parent.finishApplication(application, user);
@@ -1049,4 +1051,28 @@ public void apply(Resource cluster,
parent.apply(cluster, request);
}
}
+
+ @Override
+ public boolean haveActiveApps() {
+ return getNumApplications() > 0;
+ }
+
+ @Override
+ public void stopQueue() {
+ try {
+ this.writeLock.lock();
+ if (haveActiveApps()) {
+ updateQueueState(QueueState.DRAINING);
+ } else {
+ updateQueueState(QueueState.STOPPED);
+ }
+ if (getChildQueues() != null) {
+ for(CSQueue child : getChildQueues()) {
+ child.stopQueue();
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 2ce5fcb..f4a1f78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
@@ -953,6 +955,7 @@ public void testUserLimits() throws Exception {
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testComputeUserLimitAndSetHeadroom() throws IOException {
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
@@ -974,6 +977,14 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
+ = mock(CapacitySchedulerQueueManager.class);
+ QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
+ when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
+ mockQueueStateManager);
+ when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
+ mockCapacitySchedulerQueueManager);
+
//our test plan contains three cases
//1. single user dominate the queue, we test the headroom
//2. two users, but user_0 is assigned 100% of the queue resource,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
new file mode 100644
index 0000000..7763dac
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test QueueStateManager.
+ *
+ */
+public class TestQueueStateManager {
+ private static final String Q1 = "q1";
+ private static final String Q2 = "q2";
+ private static final String Q3 = "q3";
+
+ private final static String Q1_PATH =
+ CapacitySchedulerConfiguration.ROOT + "." + Q1;
+ private final static String Q2_PATH =
+ Q1_PATH + "." + Q2;
+ private final static String Q3_PATH =
+ Q1_PATH + "." + Q3;
+ private CapacityScheduler cs;
+ private YarnConfiguration conf;
+
+ @Test
+ public void testQueueStateManager() throws AccessControlException,
+ YarnException {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1});
+ csConf.setQueues(Q1_PATH, new String[] {Q2, Q3});
+
+ csConf.setCapacity(Q1_PATH, 100);
+ csConf.setCapacity(Q2_PATH, 50);
+ csConf.setCapacity(Q3_PATH, 50);
+
+ conf = new YarnConfiguration(csConf);
+ cs = new CapacityScheduler();
+
+ RMContext rmContext = TestUtils.getMockRMContext();
+ cs.setConf(conf);
+ cs.setRMContext(rmContext);
+ cs.init(conf);
+
+ @SuppressWarnings("rawtypes")
+ QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager()
+ .getQueueStateManager();
+
+ //by default, the state of both queues should be RUNNING
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+ // Stop Q2, and verify that Q2 transmits to STOPPED STATE
+ stateManager.stopQueue(Q2);
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+ // Stop Q1, and verify that Q1, as well as its child: Q3,
+ // transmits to STOPPED STATE
+ stateManager.stopQueue(Q1);
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+ Assert.assertTrue(stateManager.canDelete(Q1));
+ Assert.assertTrue(stateManager.canDelete(Q2));
+ Assert.assertTrue(stateManager.canDelete(Q3));
+
+ // Active Q2, it will fail.
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+
+ // Now active Q1
+ stateManager.activateQueue(Q1);
+ // Q1 should be in RUNNING state. Its children: Q2 and Q3
+ // should still be in STOPPED state.
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+ // Now active Q2 and Q3
+ stateManager.activateQueue(Q2);
+ stateManager.activateQueue(Q3);
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState());
+
+ Assert.assertFalse(stateManager.canDelete(Q1));
+ Assert.assertFalse(stateManager.canDelete(Q2));
+ Assert.assertFalse(stateManager.canDelete(Q3));
+
+ ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+ String userName = "testUser";
+ cs.getQueue(Q2).submitApplication(appId, userName, Q2);
+ FiCaSchedulerApp app = getMockApplication(appId, userName,
+ Resources.createResource(4, 0));
+ cs.getQueue(Q2).submitApplicationAttempt(app, userName);
+ stateManager.stopQueue(Q1);
+
+ Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState());
+
+ cs.getQueue(Q2).finishApplicationAttempt(app, Q2);
+ cs.getQueue(Q2).finishApplication(appId, userName);
+
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState());
+ Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState());
+ }
+
+ private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user,
+ Resource amResource) {
+ FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ doReturn(applicationAttemptId.getApplicationId()).
+ when(application).getApplicationId();
+ doReturn(applicationAttemptId).when(application).getApplicationAttemptId();
+ doReturn(user).when(application).getUser();
+ doReturn(amResource).when(application).getAMResource();
+ doReturn(Priority.newInstance(0)).when(application).getPriority();
+ doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
+ .getAppAMNodePartitionName();
+ doReturn(amResource).when(application).getAMResource(
+ CommonNodeLabelsManager.NO_LABEL);
+ when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
+ .thenCallRealMethod();
+ return application;
+ }
+}