diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java index 2bc0407..acca23b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java @@ -29,6 +29,10 @@ * * * @see QueueInfo @@ -41,7 +45,12 @@ * Stopped - Not accepting submissions of new applications. */ STOPPED, - + + /** + * Draining - Not accepting submissions of new applications, + * and waiting for applications finish + */ + DRAINING, /** * Running - normal operation. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 5a70298..a8ba740 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -427,6 +427,7 @@ message YarnClusterMetricsProto { enum QueueStateProto { Q_STOPPED = 1; Q_RUNNING = 2; + Q_DRAINING = 3; } message QueueStatisticsProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java new file mode 100644 index 0000000..6e7f106 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueStateManager.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + +/** + * + * QueueStateManager who would manage the queue state. + * + */ +@SuppressWarnings("rawtypes") +@Private +@Unstable +public class QueueStateManager { + + private static final Log LOG = LogFactory.getLog(QueueStateManager.class); + + private SchedulerQueueManager queueManager; + + public synchronized void initialize(SchedulerQueueManager + newQueueManager) { + this.queueManager = newQueueManager; + } + + /** + * Stop the queue. + * @param queueName the queue name + * @throws YarnException if the queue does not exist + */ + @SuppressWarnings("unchecked") + public synchronized void stopQueue(String queueName) throws YarnException { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + throw new YarnException("The specified queue:" + queueName + + " does not exist!"); + } + queue.stopQueue(); + } + + /** + * Active the queue. + * @param queueName the queue name + * @throws YarnException if the queue does not exist + * or the queue can not be activated. + */ + @SuppressWarnings("unchecked") + public synchronized void activateQueue(String queueName) + throws YarnException { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + throw new YarnException("The specified queue:" + queueName + + " does not exist!"); + } + queue.activeQueue(); + } + + /** + * Whether this queue can be deleted. + * @param queueName the queue name + * @return true if the queue can be deleted + */ + @SuppressWarnings("unchecked") + public boolean canDelete(String queueName) { + SchedulerQueue queue = queueManager.getQueue(queueName); + if (queue == null) { + LOG.info("The specified queue:" + queueName + " does not exist!"); + return false; + } + if (queue.getState() == QueueState.STOPPED){ + return true; + } + LOG.info("Need to stop the specific queue:" + queueName + " first."); + return false; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java new file mode 100644 index 0000000..f31f606 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * + * Represents a queue in Scheduler. + * + */ +@SuppressWarnings("rawtypes") +@LimitedPrivate("yarn") +public interface SchedulerQueue extends Queue { + + /** + * Get list of child queues. + * @return a list of child queues + */ + List getChildQueues(); + + /** + * Get the parent queue. + * @return the parent queue + */ + T getParent(); + + /** + * Get current queue state. + * @return the queue state + */ + QueueState getState(); + + /** + * Update the queue state. + * @param state the queue state + */ + void updateQueueState(QueueState state); + + /** + * Whether there is any active application in this queue. + * @return true if any application is active in this queue + */ + boolean haveActiveApps(); + + /** + * Stop the queue. + */ + void stopQueue(); + + /** + * Active the queue. + * @throws YarnException if the queue can not be activated. + */ + void activeQueue() throws YarnException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java index 92b989a..24797a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java @@ -29,9 +29,10 @@ * Context of the Queues in Scheduler. * */ +@SuppressWarnings("rawtypes") @Private @Unstable -public interface SchedulerQueueManager { /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 3372392..855ef87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessRequest; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; @@ -77,7 +79,7 @@ final Resource minimumAllocation; volatile Resource maximumAllocation; - volatile QueueState state; + private volatile QueueState state = null; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; @@ -292,9 +294,8 @@ void setupQueueConfigs(Resource clusterResource) csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); - authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - initializeQueueState(); + authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); this.acls = csContext.getConfiguration().getAcls(getQueuePath()); @@ -335,25 +336,55 @@ void setupQueueConfigs(Resource clusterResource) } private void initializeQueueState() { - // inherit from parent if state not set, only do this when we are not root - if (parent != null) { - QueueState configuredState = csContext.getConfiguration() - .getConfiguredState(getQueuePath()); - QueueState parentState = parent.getState(); - if (configuredState == null) { - this.state = parentState; - } else if (configuredState == QueueState.RUNNING - && parentState == QueueState.STOPPED) { - throw new IllegalArgumentException( - "The parent queue:" + parent.getQueueName() + " state is STOPPED, " - + "child queue:" + queueName + " state cannot be RUNNING."); + QueueState current = getState(); + QueueState configuredState = csContext.getConfiguration() + .getConfiguredState(getQueuePath()); + // verify that we can not any value for State other than RUNNING/STOPPED + if (configuredState != null && configuredState != QueueState.RUNNING + && configuredState != QueueState.STOPPED) { + throw new IllegalArgumentException("Invalid queue state configuration." + + " We can only use RUNNING or STOPPED."); + } + // If we did not set state in configuration, use Running as default state + QueueState defaultState = QueueState.RUNNING; + + if (current == null) { + // If current state of the queue is null, we would inherit the state + // from its parent. If this queue does not has parent, such as root queue, + // we would use the configured state. + QueueState parentState = (parent == null) ? null : parent.getState(); + if (parentState == null) { + updateQueueState((configuredState == null) ? defaultState + : configuredState); } else { - this.state = configuredState; + if (configuredState == null) { + updateQueueState((parentState == QueueState.DRAINING) ? + QueueState.STOPPED : parentState); + } else if (configuredState == QueueState.RUNNING + && parentState != QueueState.RUNNING) { + throw new IllegalArgumentException( + "The parent queue:" + parent.getQueueName() + + " state is STOPPED, child queue:" + queueName + + " state cannot be RUNNING."); + } else { + updateQueueState(configuredState); + } } } else { - // if this is the root queue, get the state from the configuration. - // if the state is not set, use RUNNING as default state. - this.state = csContext.getConfiguration().getState(getQueuePath()); + // when we get a refreshQueue request from AdminService, + if (current == QueueState.RUNNING) { + if (configuredState == QueueState.STOPPED) { + stopQueue(); + } + } else { + if (configuredState == QueueState.RUNNING) { + try { + activeQueue(); + } catch (YarnException ex) { + throw new IllegalArgumentException(ex.getMessage()); + } + } + } } } @@ -367,7 +398,7 @@ protected QueueInfo getQueueInfo() { queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setCapacity(queueCapacities.getCapacity()); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); - queueInfo.setQueueState(state); + queueInfo.setQueueState(getState()); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setQueueStatistics(getQueueStatistics()); @@ -846,4 +877,47 @@ public void validateSubmitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Dummy implementation } + + @Override + public void updateQueueState(QueueState queueState) { + this.state = queueState; + } + + @Override + public void activeQueue() throws YarnException { + try { + this.writeLock.lock(); + if (getState() == QueueState.RUNNING) { + LOG.info("The specified queue:" + queueName + + " is already in the RUNNING state."); + } else if (getState() == QueueState.DRAINING) { + throw new YarnException( + "The queue:" + queueName + " is in the Stopping process. " + + "Please wait for the queue getting fully STOPPED."); + } else { + CSQueue parent = getParent(); + if (parent == null || parent.getState() == QueueState.RUNNING) { + updateQueueState(QueueState.RUNNING); + } else { + throw new YarnException("The parent Queue:" + parent.getQueueName() + + " is not running. Please activate the parent queue first"); + } + } + } finally { + this.writeLock.unlock(); + } + } + + protected void appFinished() { + try { + this.writeLock.lock(); + if (getState() == QueueState.DRAINING) { + if (!haveActiveApps()) { + updateQueueState(QueueState.STOPPED); + } + } + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 550e206..e30ec39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -56,8 +57,7 @@ */ @Stable @Private -public interface CSQueue -extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { +public interface CSQueue extends SchedulerQueue { /** * Get the parent Queue. * @return the parent queue diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9a73a65..5463abd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2474,4 +2474,9 @@ public int getAsyncSchedulingPendingBacklogs() { } return 0; } + + @Override + public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { + return this.queueManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index c41a7bf..7d29619 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -83,4 +83,6 @@ ResourceUsage getClusterResourceUsage(); ActivitiesManager getActivitiesManager(); + + CapacitySchedulerQueueManager getCapacitySchedulerQueueManager(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7a6ce56..6a3c08a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; @@ -86,6 +87,9 @@ public CSQueue hook(CSQueue queue) { private CSQueue root; private final RMNodeLabelsManager labelManager; + private QueueStateManager + queueStateManager; + /** * Construct the service. * @param conf the configuration @@ -95,6 +99,7 @@ public CapacitySchedulerQueueManager(Configuration conf, RMNodeLabelsManager labelManager) { this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.labelManager = labelManager; + this.queueStateManager = new QueueStateManager<>(); } @Override @@ -142,6 +147,7 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); LOG.info("Initialized root queue " + root); } @@ -170,6 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) clusterResource)); labelManager.reinitializeQueueLabels(getQueueToLabels()); + this.queueStateManager.initialize(this); } /** @@ -358,4 +365,10 @@ public Priority getDefaultPriorityForQueue(String queueName) { } return queueToLabels; } + + @Private + public QueueStateManager + getQueueStateManager() { + return this.queueStateManager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1c6471f..2dee296 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -273,7 +273,7 @@ protected void setupQueueConfigs(Resource clusterResource) + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + "numContainers = " + numContainers - + " [= currentNumContainers ]" + "\n" + "state = " + state + + " [= currentNumContainers ]" + "\n" + "state = " + getState() + " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder @@ -879,6 +879,9 @@ private void addApplicationAttempt(FiCaSchedulerApp application, public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager activeUsersManager.deactivateApplication(user, application); + + appFinished(); + // Inform the parent queue getParent().finishApplication(application, user); } @@ -2425,4 +2428,23 @@ public Resource getClusterResource() { return clusterResource; } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } + + @Override + public void stopQueue() { + try { + writeLock.lock(); + if (haveActiveApps()) { + updateQueueState(QueueState.DRAINING); + } else { + updateQueueState(QueueState.STOPPED); + } + } finally { + writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 0ba4ede..bdee9f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -133,7 +133,7 @@ void setupQueueConfigs(Resource clusterResource) + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities - .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + .getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { @@ -369,7 +369,7 @@ public void validateSubmitApplication(ApplicationId applicationId, "Cannot submit application " + "to non-leaf queue: " + queueName); } - if (state != QueueState.RUNNING) { + if (getState() != QueueState.RUNNING) { throw new AccessControlException("Queue " + getQueuePath() + " is STOPPED. Cannot accept submission of application: " + applicationId); @@ -411,7 +411,9 @@ private void addApplication(ApplicationId applicationId, public void finishApplication(ApplicationId application, String user) { removeApplication(application, user); - + + appFinished(); + // Inform the parent queue if (parent != null) { parent.finishApplication(application, user); @@ -1049,4 +1051,28 @@ public void apply(Resource cluster, parent.apply(cluster, request); } } + + @Override + public boolean haveActiveApps() { + return getNumApplications() > 0; + } + + @Override + public void stopQueue() { + try { + this.writeLock.lock(); + if (haveActiveApps()) { + updateQueueState(QueueState.DRAINING); + } else { + updateQueueState(QueueState.STOPPED); + } + if (getChildQueues() != null) { + for(CSQueue child : getChildQueues()) { + child.stopQueue(); + } + } + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2ce5fcb..f4a1f78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -73,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; @@ -953,6 +955,7 @@ public void testUserLimits() throws Exception { } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test public void testComputeUserLimitAndSetHeadroom() throws IOException { LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); @@ -974,6 +977,14 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager + = mock(CapacitySchedulerQueueManager.class); + QueueStateManager mockQueueStateManager = mock(QueueStateManager.class); + when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn( + mockQueueStateManager); + when(csContext.getCapacitySchedulerQueueManager()).thenReturn( + mockCapacitySchedulerQueueManager); + //our test plan contains three cases //1. single user dominate the queue, we test the headroom //2. two users, but user_0 is assigned 100% of the queue resource, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java new file mode 100644 index 0000000..7763dac --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test QueueStateManager. + * + */ +public class TestQueueStateManager { + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + private static final String Q3 = "q3"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + Q1_PATH + "." + Q2; + private final static String Q3_PATH = + Q1_PATH + "." + Q3; + private CapacityScheduler cs; + private YarnConfiguration conf; + + @Test + public void testQueueStateManager() throws AccessControlException, + YarnException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {Q1}); + csConf.setQueues(Q1_PATH, new String[] {Q2, Q3}); + + csConf.setCapacity(Q1_PATH, 100); + csConf.setCapacity(Q2_PATH, 50); + csConf.setCapacity(Q3_PATH, 50); + + conf = new YarnConfiguration(csConf); + cs = new CapacityScheduler(); + + RMContext rmContext = TestUtils.getMockRMContext(); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + + @SuppressWarnings("rawtypes") + QueueStateManager stateManager = cs.getCapacitySchedulerQueueManager() + .getQueueStateManager(); + + //by default, the state of both queues should be RUNNING + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + // Stop Q2, and verify that Q2 transmits to STOPPED STATE + stateManager.stopQueue(Q2); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Stop Q1, and verify that Q1, as well as its child: Q3, + // transmits to STOPPED STATE + stateManager.stopQueue(Q1); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + Assert.assertTrue(stateManager.canDelete(Q1)); + Assert.assertTrue(stateManager.canDelete(Q2)); + Assert.assertTrue(stateManager.canDelete(Q3)); + + // Active Q2, it will fail. + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + + // Now active Q1 + stateManager.activateQueue(Q1); + // Q1 should be in RUNNING state. Its children: Q2 and Q3 + // should still be in STOPPED state. + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + // Now active Q2 and Q3 + stateManager.activateQueue(Q2); + stateManager.activateQueue(Q3); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.RUNNING, cs.getQueue(Q3).getState()); + + Assert.assertFalse(stateManager.canDelete(Q1)); + Assert.assertFalse(stateManager.canDelete(Q2)); + Assert.assertFalse(stateManager.canDelete(Q3)); + + ApplicationId appId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + String userName = "testUser"; + cs.getQueue(Q2).submitApplication(appId, userName, Q2); + FiCaSchedulerApp app = getMockApplication(appId, userName, + Resources.createResource(4, 0)); + cs.getQueue(Q2).submitApplicationAttempt(app, userName); + stateManager.stopQueue(Q1); + + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.DRAINING, cs.getQueue(Q2).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q3).getState()); + + cs.getQueue(Q2).finishApplicationAttempt(app, Q2); + cs.getQueue(Q2).finishApplication(appId, userName); + + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q1).getState()); + Assert.assertEquals(QueueState.STOPPED, cs.getQueue(Q2).getState()); + } + + private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, + Resource amResource) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId).when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + doReturn(amResource).when(application).getAMResource(); + doReturn(Priority.newInstance(0)).when(application).getPriority(); + doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) + .getAppAMNodePartitionName(); + doReturn(amResource).when(application).getAMResource( + CommonNodeLabelsManager.NO_LABEL); + when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) + .thenCallRealMethod(); + return application; + } +}