amReqs, ApplicationPlacementContext placementContext, long startTime) {
this.systemClock = SystemClock.getInstance();
@@ -520,6 +524,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
}
}
+
+ this.placementContext = placementContext;
}
/**
@@ -2004,6 +2010,11 @@ public void setApplicationPriority(Priority applicationPriority) {
this.applicationPriority = applicationPriority;
}
+ @Override
+ public ApplicationPlacementContext getApplicationQueuePlacementContext() {
+ return this.placementContext;
+ }
+
/**
* Clear Unused fields to free memory.
* @param app
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 250f4e6b9a7..d1f6402da61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -78,7 +78,7 @@
final String queueName;
private final String queuePath;
volatile int numContainers;
-
+
final Resource minimumAllocation;
volatile Resource maximumAllocation;
private volatile QueueState state = null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
new file mode 100644
index 00000000000..b3d1b4738d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A container class for automatically created child leaf queues.
+ * From the user perspective this is equivalent to a LeafQueue,
+ * but functionality wise is a sub-class of ParentQueue
+ */
+public abstract class AbstractManagedParentQueue extends ParentQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractManagedParentQueue.class);
+
+ private int maxAppsForAutoCreatedQueues;
+ private int maxAppsPerUserForAutoCreatedQueues;
+ private int userLimit;
+ private float userLimitFactor;
+
+ public AbstractManagedParentQueue(CapacitySchedulerContext cs,
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
+
+ super.setupQueueConfigs(csContext.getClusterResource());
+ initializeLeafQueueConfigs();
+
+ StringBuffer queueInfo = new StringBuffer();
+ queueInfo.append("Created Managed Parent Queue: ").append(queueName)
+ .append("\nof type : [" + getClass())
+ .append("]\nwith capacity: [")
+ .append(super.getCapacity()).append("]\nwith max capacity: [")
+ .append(super.getMaximumCapacity()).append("\nwith max apps: [")
+ .append(getMaxApplicationsForAutoCreatedQueues())
+ .append("]\nwith max apps per user: [")
+ .append(getMaxApplicationsPerUserForAutoCreatedQueues())
+ .append("]\nwith user limit: [").append(getUserLimit())
+ .append("]\nwith user limit factor: [")
+ .append(getUserLimitFactor()).append("].");
+ LOG.info(queueInfo.toString());
+ }
+
+ @Override
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ try {
+ writeLock.lock();
+
+ // Set new configs
+ setupQueueConfigs(clusterResource);
+
+ initializeLeafQueueConfigs();
+
+ // run reinitialize on each existing queue, to trigger absolute cap
+ // recomputations
+ for (CSQueue res : this.getChildQueues()) {
+ res.reinitialize(res, clusterResource);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Initialize leaf queue configs from template configurations specified on
+ * parent queue.
+ */
+ protected void initializeLeafQueueConfigs() {
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+
+ final String queuePath = super.getQueuePath();
+ int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
+ if (maxApps < 0) {
+ maxApps = (int) (
+ CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
+ * getAbsoluteCapacity());
+ }
+ userLimit = conf.getUserLimit(queuePath);
+ userLimitFactor = conf.getUserLimitFactor(queuePath);
+ maxAppsForAutoCreatedQueues = maxApps;
+ maxAppsPerUserForAutoCreatedQueues =
+ (int) (maxApps * (userLimit / 100.0f) * userLimitFactor);
+
+ }
+
+ /**
+ * Number of maximum applications for each of the auto created leaf queues.
+ *
+ * @return maxAppsForAutoCreatedQueues
+ */
+ public int getMaxApplicationsForAutoCreatedQueues() {
+ return maxAppsForAutoCreatedQueues;
+ }
+
+ /**
+ * Number of maximum applications per user for each of the auto created
+ * leaf queues.
+ *
+ * @return maxAppsPerUserForAutoCreatedQueues
+ */
+ public int getMaxApplicationsPerUserForAutoCreatedQueues() {
+ return maxAppsPerUserForAutoCreatedQueues;
+ }
+
+ /**
+ * User limit value for each of the auto created leaf queues.
+ *
+ * @return userLimit
+ */
+ public int getUserLimitForAutoCreatedQueues() {
+ return userLimit;
+ }
+
+ /**
+ * User limit factor value for each of the auto created leaf queues.
+ *
+ * @return userLimitFactor
+ */
+ public float getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
+ public int getMaxAppsForAutoCreatedQueues() {
+ return maxAppsForAutoCreatedQueues;
+ }
+
+ public int getMaxAppsPerUserForAutoCreatedQueues() {
+ return maxAppsPerUserForAutoCreatedQueues;
+ }
+
+ public int getUserLimit() {
+ return userLimit;
+ }
+
+ /**
+ * Add the specified child queue.
+ * @param childQueue reference to the child queue to be added
+ * @throws SchedulerDynamicEditException
+ */
+ public void addChildQueue(CSQueue childQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ if (childQueue.getCapacity() > 0) {
+ throw new SchedulerDynamicEditException(
+ "Queue " + childQueue + " being added has non zero capacity.");
+ }
+ boolean added = this.childQueues.add(childQueue);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updateChildQueues (action: add queue): " + added + " "
+ + getChildQueuesToPrint());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Remove the specified child queue.
+ * @param childQueue reference to the child queue to be removed
+ * @throws SchedulerDynamicEditException
+ */
+ public void removeChildQueue(CSQueue childQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ if (childQueue.getCapacity() > 0) {
+ throw new SchedulerDynamicEditException(
+ "Queue " + childQueue + " being removed has non zero capacity.");
+ }
+ Iterator qiter = childQueues.iterator();
+ while (qiter.hasNext()) {
+ CSQueue cs = qiter.next();
+ if (cs.equals(childQueue)) {
+ qiter.remove();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed child queue: {}" + cs.getQueueName());
+ }
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Remove the specified child queue.
+ * @param childQueueName name of the child queue to be removed
+ * @throws SchedulerDynamicEditException
+ */
+ public CSQueue removeChildQueue(String childQueueName)
+ throws SchedulerDynamicEditException {
+ CSQueue childQueue;
+ try {
+ writeLock.lock();
+ childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(
+ childQueueName);
+ if (childQueue != null) {
+ removeChildQueue(childQueue);
+ } else {
+ throw new SchedulerDynamicEditException("Cannot find queue to delete "
+ + ": " + childQueueName);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ return childQueue;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
similarity index 63%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 3d1b7317489..4eb7cdd9d95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,35 +18,35 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.io.IOException;
-
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
- * This represents a dynamic {@link LeafQueue} managed by the
- * {@link ReservationSystem}
- *
+ * Leaf queues which are auto created by an underkying implementation of
+ * AbstractManagedParentQueue. Eg: PlanQueue for reservations or
+ * ManagedParentQueue for auto created dynamic queues
*/
-public class ReservationQueue extends LeafQueue {
+public class AutoCreatedLeafQueue extends LeafQueue {
private static final Logger LOG = LoggerFactory
- .getLogger(ReservationQueue.class);
+ .getLogger(AutoCreatedLeafQueue.class);
- private PlanQueue parent;
+ private AbstractManagedParentQueue parent;
- public ReservationQueue(CapacitySchedulerContext cs, String queueName,
- PlanQueue parent) throws IOException {
+ public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
+ AbstractManagedParentQueue parent) throws IOException {
super(cs, queueName, parent, null);
- // the following parameters are common to all reservation in the plan
- updateQuotas(parent.getUserLimitForReservation(),
+
+ updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
parent.getUserLimitFactor(),
- parent.getMaxApplicationsForReservations(),
- parent.getMaxApplicationsPerUserForReservation());
+ parent.getMaxApplicationsForAutoCreatedQueues(),
+ parent.getMaxApplicationsPerUserForAutoCreatedQueues());
+
this.parent = parent;
}
@@ -55,21 +55,18 @@ public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
- // Sanity check
- if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
- .getQueuePath().equals(getQueuePath())) {
- throw new IOException(
- "Trying to reinitialize " + getQueuePath() + " from "
- + newlyParsedQueue.getQueuePath());
- }
+
+ validate(newlyParsedQueue);
+
super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
- updateQuotas(parent.getUserLimitForReservation(),
+ updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
parent.getUserLimitFactor(),
- parent.getMaxApplicationsForReservations(),
- parent.getMaxApplicationsPerUserForReservation());
+ parent.getMaxApplicationsForAutoCreatedQueues(),
+ parent.getMaxApplicationsPerUserForAutoCreatedQueues());
+
} finally {
writeLock.unlock();
}
@@ -77,10 +74,10 @@ public void reinitialize(CSQueue newlyParsedQueue,
/**
* This methods to change capacity for a queue and adjusts its
- * absoluteCapacity
- *
+ * absoluteCapacity.
+ *
* @param entitlement the new entitlement for the queue (capacity,
- * maxCapacity, etc..)
+ * maxCapacity)
* @throws SchedulerDynamicEditException
*/
public void setEntitlement(QueueEntitlement entitlement)
@@ -94,8 +91,6 @@ public void setEntitlement(QueueEntitlement entitlement)
}
setCapacity(capacity);
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
- // note: we currently set maxCapacity to capacity
- // this might be revised later
setMaxCapacity(entitlement.getMaxCapacity());
if (LOG.isDebugEnabled()) {
LOG.debug("successfully changed to " + capacity + " for queue " + this
@@ -106,12 +101,14 @@ public void setEntitlement(QueueEntitlement entitlement)
}
}
- private void updateQuotas(int userLimit, float userLimitFactor,
- int maxAppsForReservation, int maxAppsPerUserForReservation) {
- setUserLimit(userLimit);
- setUserLimitFactor(userLimitFactor);
- setMaxApplications(maxAppsForReservation);
- maxApplicationsPerUser = maxAppsPerUserForReservation;
+ private void validate(final CSQueue newlyParsedQueue) throws IOException {
+ if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Error trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
+
}
@Override
@@ -119,4 +116,14 @@ protected void setupConfigurableCapacities() {
CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
queueCapacities, parent == null ? null : parent.getQueueCapacities());
}
+
+ private void updateApplicationAndUserLimits(int userLimit,
+ float userLimitFactor,
+ int maxAppsForAutoCreatedQueues,
+ int maxAppsPerUserForAutoCreatedQueues) {
+ setUserLimit(userLimit);
+ setUserLimitFactor(userLimitFactor);
+ setMaxApplications(maxAppsForAutoCreatedQueues);
+ setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d91aa55a487..e4e372e019b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -64,11 +64,11 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -138,6 +138,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -171,6 +173,8 @@
private int maxAssignPerHeartbeat;
+ private final Clock clock = new UTCClock();
+
private CSConfigurationProvider csConfProvider;
@Override
@@ -454,6 +458,11 @@ long getAsyncScheduleInterval() {
private final static Random random = new Random(System.currentTimeMillis());
+ @Override
+ public Clock getClock() {
+ return clock;
+ }
+
/**
* Schedule on all nodes by starting at a random point.
* @param cs
@@ -560,44 +569,17 @@ public int getPendingBacklogs() {
}
@VisibleForTesting
- public UserGroupMappingPlacementRule
- getUserGroupMappingPlacementRule() throws IOException {
+ public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
try {
readLock.lock();
- boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
- LOG.info(
- "Initialized queue mappings, override: " + overrideWithQueueMappings);
-
- // Get new user/group mappings
- List newMappings = conf.getQueueMappings();
- // check if mappings refer to valid queues
- for (QueueMapping mapping : newMappings) {
- String mappingQueue = mapping.getQueue();
- if (!mappingQueue.equals(
- UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
- .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
- CSQueue queue = getQueue(mappingQueue);
- if (queue == null || !(queue instanceof LeafQueue)) {
- throw new IOException(
- "mapping contains invalid or non-leaf queue " + mappingQueue);
- }
- }
- }
-
- // initialize groups if mappings are present
- if (newMappings.size() > 0) {
- Groups groups = new Groups(conf);
- return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
- newMappings, groups);
- }
-
- return null;
+ return UserGroupMappingPlacementRule.get(this);
} finally {
readLock.unlock();
}
}
- private void updatePlacementRules() throws IOException {
+ @VisibleForTesting
+ void updatePlacementRules() throws IOException {
// Initialize placement rules
Collection placementRuleStrs = conf.getStringCollection(
YarnConfiguration.QUEUE_PLACEMENT_RULES);
@@ -731,28 +713,56 @@ private void addApplicationOnRecovery(
}
}
- private void addApplication(ApplicationId applicationId,
- String queueName, String user, Priority priority) {
+ private void addApplication(ApplicationId applicationId, String queueName,
+ String user, Priority priority) {
try {
writeLock.lock();
if (isSystemAppsLimitReached()) {
String message = "Maximum system application limit reached,"
+ "cannot accept submission of application: " + applicationId;
- this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
- applicationId, RMAppEventType.APP_REJECTED, message));
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
return;
}
// Sanity checks.
CSQueue queue = getQueue(queueName);
+
if (queue == null) {
- String message =
- "Application " + applicationId + " submitted by user " + user
- + " to unknown queue: " + queueName;
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
- message));
- return;
+ //Could be a potential auto-created leaf queue
+ try {
+ LeafQueue autoCreatedLeafQueue = autoCreateLeafQueue(applicationId);
+ if (autoCreatedLeafQueue == null) {
+ final String message =
+ "Application " + applicationId + " submitted by user " + user
+ + " to unknown queue: " + queueName;
+
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ } else{
+ queue = autoCreatedLeafQueue;
+ }
+
+ } catch (YarnException e) {
+ LOG.error("Could not auto-create leaf queue due to : ", e);
+ final String message =
+ "Application " + applicationId + " submission by user " + user
+ + " to queue: " + queueName + " failed : " + e.getMessage();
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ } catch (IOException e) {
+ final String message =
+ "Application " + applicationId + " submission by user " + user
+ + " to queue: " + queueName + " failed : " + e.getMessage();
+ LOG.error("Could not auto-create leaf queue due to : ", e);
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ }
}
+
if (!(queue instanceof LeafQueue)) {
String message =
"Application " + applicationId + " submitted by user " + user
@@ -761,7 +771,37 @@ private void addApplication(ApplicationId applicationId,
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
return;
+ } else if (queue instanceof AutoCreatedLeafQueue && queue
+ .getParent() instanceof ManagedParentQueue) {
+
+ RMApp rmApp = this.rmContext.getRMApps().get(applicationId);
+ ApplicationPlacementContext placementContext =
+ rmApp.getApplicationQueuePlacementContext();
+
+ if (placementContext == null) {
+ String message =
+ "Application " + applicationId + " submission by user " + user
+ + " to queue: " + queueName + " failed : "
+ + "Queue mapping does not exist for user";
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ } else if (!queue.getParent().getQueueName().equals(
+ placementContext.getParentQueue())) {
+ String message =
+ "Auto created Leaf queue " + placementContext.getQueue() + " already exists under " + queue
+ .getParent().getQueuePath()
+ + ".But Queue mapping has a different parent queue "
+ + placementContext.getParentQueue()
+ + " for the specified user : " + user;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+ message));
+ return;
+ }
}
+
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);
@@ -1726,7 +1766,7 @@ public FiCaSchedulerNode getNode(NodeId nodeId) {
@Override
@Lock(Lock.NoLock.class)
- public void recover(RMState state) throws Exception {
+ public void recover(RMStateStore.RMState state) throws Exception {
// NOT IMPLEMENTED
}
@@ -1921,12 +1961,12 @@ public void removeQueue(String queueName)
writeLock.lock();
LOG.info("Removing queue: " + queueName);
CSQueue q = this.getQueue(queueName);
- if (!(q instanceof ReservationQueue)) {
+ if (!(q instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"The queue that we are asked " + "to remove (" + queueName
- + ") is not a ReservationQueue");
+ + ") is not a AutoCreatedLeafQueue");
}
- ReservationQueue disposableLeafQueue = (ReservationQueue) q;
+ AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q;
// at this point we should have no more apps
if (disposableLeafQueue.getNumApplications() > 0) {
throw new SchedulerDynamicEditException(
@@ -1936,9 +1976,11 @@ public void removeQueue(String queueName)
+ " pending apps");
}
- ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
+ ((AbstractManagedParentQueue) disposableLeafQueue.getParent())
+ .removeChildQueue(q);
this.queueManager.removeQueue(queueName);
- LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
+ LOG.info("Removal of AutoCreatedLeafQueue "
+ + queueName + " has succeeded");
} finally {
writeLock.unlock();
}
@@ -1949,25 +1991,27 @@ public void addQueue(Queue queue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
- if (!(queue instanceof ReservationQueue)) {
+ if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
- "Queue " + queue.getQueueName() + " is not a ReservationQueue");
+ "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue");
}
- ReservationQueue newQueue = (ReservationQueue) queue;
+ AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
- if (newQueue.getParent() == null || !(newQueue
- .getParent() instanceof PlanQueue)) {
+ if (newQueue.getParent() == null
+ || !(AbstractManagedParentQueue.class.
+ isAssignableFrom(newQueue.getParent().getClass()))) {
throw new SchedulerDynamicEditException(
"ParentQueue for " + newQueue.getQueueName()
- + " is not properly set (should be set and be a PlanQueue)");
+ + " is not properly set (should be set and be a PlanQueue or AutoCreateEnableParentQueue)");
}
- PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
+ AbstractManagedParentQueue parentPlan =
+ (AbstractManagedParentQueue) newQueue.getParent();
String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue);
this.queueManager.addQueue(queuename, newQueue);
- LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
+ LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
} finally {
writeLock.unlock();
}
@@ -1981,21 +2025,22 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
ParentQueue parent = (ParentQueue) queue.getParent();
- if (!(queue instanceof ReservationQueue)) {
+ if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Entitlement can not be" + " modified dynamically since queue "
- + inQueue + " is not a ReservationQueue");
+ + inQueue + " is not a AutoCreatedLeafQueue");
}
- if (!(parent instanceof PlanQueue)) {
+ if (parent == null
+ || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) {
throw new SchedulerDynamicEditException(
- "The parent of ReservationQueue " + inQueue
- + " must be an PlanQueue");
+ "The parent of AutoCreatedLeafQueue " + inQueue
+ + " must be a PlanQueue/AutoCreateEnabledParentQueue");
}
- ReservationQueue newQueue = (ReservationQueue) queue;
+ AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
- float sumChilds = ((PlanQueue) parent).sumOfChildCapacities();
+ float sumChilds = parent.sumOfChildCapacities();
float newChildCap =
sumChilds - queue.getCapacity() + entitlement.getCapacity();
@@ -2010,12 +2055,13 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
newQueue.setEntitlement(entitlement);
} else{
throw new SchedulerDynamicEditException(
- "Sum of child queues would exceed 100% for PlanQueue: " + parent
- .getQueueName());
+ "Sum of child queues should exceed 100% for auto creating parent "
+ + "queue : " + parent.getQueueName());
}
LOG.info(
- "Set entitlement for ReservationQueue " + inQueue + " to " + queue
- .getCapacity() + " request was (" + entitlement.getCapacity()
+ "Set entitlement for AutoCreatedLeafQueue " + inQueue
+ + " to " + queue.getCapacity() +
+ " request was (" + entitlement.getCapacity()
+ ")");
} finally {
writeLock.unlock();
@@ -2631,4 +2677,48 @@ public MutableConfigurationProvider getMutableConfProvider() {
}
return null;
}
+
+ private LeafQueue autoCreateLeafQueue(final ApplicationId applicationId)
+ throws IOException, YarnException {
+
+ AutoCreatedLeafQueue autoCreatedLeafQueue = null;
+ RMApp rmApp = this.rmContext.getRMApps().get(applicationId);
+ ApplicationPlacementContext placementContext =
+ rmApp.getApplicationQueuePlacementContext();
+
+ if (placementContext != null) {
+
+ String leafQueueName = placementContext.getQueue();
+ String parentQueueName = placementContext.getParentQueue();
+
+ if (!StringUtils.isEmpty(parentQueueName)) {
+ CSQueue parentQueue = getQueue(parentQueueName);
+
+ if (parentQueue != null
+ && conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) {
+
+ ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue;
+ autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
+ autoCreateEnabledParentQueue);
+
+ addQueue(autoCreatedLeafQueue);
+
+ //TODO - Set entitlement through capacity management policy
+ } else{
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName + ". Queue mapping specifies an invalid parent queue which does not exist "
+ + parentQueueName);
+ }
+ } else{
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName
+ + ". Queue mapping does not specify which parent queue it needs to be created under.");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue mapping does not exist for application " + applicationId + " submitted by user : " + rmApp.getUser());
+ }
+ }
+ return autoCreatedLeafQueue;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 3a519ecf5f1..4c2641b18f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -906,6 +907,11 @@ public boolean getOverrideWithQueueMappings() {
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
}
+ @VisibleForTesting
+ public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
+ setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
+ }
+
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value
@@ -980,6 +986,34 @@ public boolean getOverrideWithQueueMappings() {
return mappings;
}
+ @VisibleForTesting
+ public void setQueuePlacementRules(Collection queuePlacementRules) {
+ if (queuePlacementRules == null) {
+ return;
+ }
+ String str = StringUtils.join(",", queuePlacementRules);
+ setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
+ }
+
+ public Collection getQueuePlacementRules() {
+ return getStringCollection(
+ YarnConfiguration.QUEUE_PLACEMENT_RULES);
+ }
+
+ @VisibleForTesting
+ public void setQueueMappings(List queueMappings) {
+ if (queueMappings == null) {
+ return;
+ }
+
+ List queueMappingStrs = new ArrayList<>();
+ for (QueueMapping mapping : queueMappings) {
+ queueMappingStrs.add(mapping.toString());
+ }
+
+ setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
+ }
+
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
@@ -1522,4 +1556,132 @@ public long getDefaultLifetimePerQueue(String queue) {
public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
}
+
+ @Private
+ public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
+
+ @Private
+ public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
+ "auto-create-child-queue.enabled";
+
+ public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
+ "leaf-queue-template";
+
+ @Private
+ public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
+ "auto-create-child-queue.max-queues";
+
+ @Private
+ public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
+
+ /**
+ * If true, this queue will be created as a Parent Queue which Auto Created leaf child queues
+ * @param queuePath The queues path
+ * @return true if auto create is enabled for child queues else false. Default is false
+ */
+ public boolean isAutoCreateChildQueueEnabled(String queuePath) {
+ boolean isAutoCreateEnabled =
+ getBoolean(getQueuePrefix(queuePath)
+ + AUTO_CREATE_CHILD_QUEUE_ENABLED,
+ DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
+ return isAutoCreateEnabled;
+ }
+
+ @VisibleForTesting
+ public void setAutoCreateChildQueueEnabled(String queuePath, boolean autoCreationEnabled) {
+ setBoolean(getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED, autoCreationEnabled);
+ }
+
+ /**
+ * Get the auto created leaf queue's minimum guaranteed capacity.
+ * Leaf queue's template capacities are configured at the parent queue
+ * @param queuePath The parent queue's path
+ * @return the leaf queue's template capacity
+ */
+ public float getAutoCreatedLeafQueueTemplateCapacity(String queuePath) {
+ return
+ getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + CAPACITY,
+ (float) UNDEFINED);
+ }
+
+ @VisibleForTesting
+ public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, float capacity) {
+ setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath)
+ + CAPACITY,
+ capacity);
+ }
+
+ /**
+ * Get the auto created leaf queue's maximum capacity
+ * @param queuePath The parent queue's path
+ * @return the leaf queue's template capacity
+ * @return
+ */
+ public float getAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath) {
+ return
+ getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + MAXIMUM_CAPACITY,
+ (float) UNDEFINED);
+ }
+
+ @VisibleForTesting
+ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, float maxCapacity) {
+ setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath)
+ + MAXIMUM_CAPACITY,
+ maxCapacity);
+ }
+
+ @VisibleForTesting
+ public void setAutoCreatedLeafQueueTemplateMaxApplicationsPerQueue(String queue, int maxApplicationsPerQueue) {
+ setInt(getAutoCreatedQueueTemplateConfPrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
+ maxApplicationsPerQueue);
+ }
+
+ /**
+ * Get the auto created leaf queue's template configuration prefix
+ * Leaf queue's template capacities are configured at the parent queue
+ * @param queuePath parent queue's path
+ * @return Config prefix for leaf queue template configurations
+ */
+ public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
+ return getQueuePrefix(queuePath) + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX + DOT;
+ }
+
+ @Private
+ public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
+ "auto-create-child-queue.fail-on-exceeding-parent-capacity";
+
+ @Private
+ public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = false;
+
+ /**
+ * Fail further auto leaf queue creation when parent's guaranteed capacity is
+ * exceeded.
+ * @param parentQueuePath the parent queue's path
+ * @return true if configured to fail
+ * else false
+ */
+ public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(String parentQueuePath) {
+ boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
+ getBoolean(getQueuePrefix(parentQueuePath)
+ + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
+ DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
+ return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
+ }
+
+ @VisibleForTesting
+ public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(String queuePath, boolean autoCreationEnabled) {
+ setBoolean(getQueuePrefix(queuePath) + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, autoCreationEnabled);
+ }
+
+ /**
+ * Get the max number of leaf queues that are allowed to be created under
+ * a parent queue
+ * @param queuePath the paret queue's path
+ * @return the max number of leaf queues allowed to be auto created
+ */
+ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
+ return
+ getInt(getAutoCreatedQueueTemplateConfPrefix(queuePath) + AUTO_CREATE_QUEUE_MAX_QUEUES,
+ DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 7c918a53620..f2ef9d53685 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
@@ -94,4 +95,7 @@
* @return if configuration is mutable
*/
boolean isConfigurationMutable();
+
+ Clock getClock();
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 48c289f0cde..cd668aa2ed3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
/**
*
@@ -154,7 +155,7 @@ public void setCapacitySchedulerContext(
* @throws IOException if fails to initialize queues
*/
public void initializeQueues(CapacitySchedulerConfiguration conf)
- throws IOException {
+ throws IOException {
root = parseQueue(this.csContext, conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, appPriorityACLManager, queues);
@@ -176,7 +177,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
if (!csContext.isConfigurationMutable() ||
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
- // Ensure queue hiearchy in the new XML file is proper.
+ // Ensure queue hierarchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues);
}
@@ -221,6 +222,7 @@ static CSQueue parseQueue(
: (parent.getQueuePath() + "." + queueName);
String[] childQueueNames = conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName);
+ boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
@@ -238,7 +240,7 @@ static CSQueue parseQueue(
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List childQueues = new ArrayList<>();
- ReservationQueue resQueue = new ReservationQueue(csContext,
+ AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
@@ -249,11 +251,14 @@ static CSQueue parseQueue(
((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue);
+ } else if (isAutoCreateEnabled) {
+ queue = new ManagedParentQueue(csContext, queueName,
+ parent, oldQueues.get(queueName));
+
} else {
queue =
new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName));
-
// Used only for unit tests
queue = hook.hook(queue);
}
@@ -262,9 +267,16 @@ static CSQueue parseQueue(
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
- ParentQueue parentQueue =
- new ParentQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
+
+ ParentQueue parentQueue;
+ if (isAutoCreateEnabled) {
+ parentQueue = new ManagedParentQueue(csContext, queueName,
+ parent, oldQueues.get(queueName));
+ } else {
+ parentQueue =
+ new ParentQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+ }
// Used only for unit tests
queue = hook.hook(parentQueue);
@@ -277,6 +289,7 @@ static CSQueue parseQueue(
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
+
}
if (queue instanceof LeafQueue && queues.containsKey(queueName)
@@ -303,7 +316,7 @@ private void validateQueueHierarchy(Map queues,
Map newQueues) throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry e : queues.entrySet()) {
- if (!(e.getValue() instanceof ReservationQueue)) {
+ if (!(e.getValue() instanceof AutoCreatedLeafQueue)) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
@@ -323,6 +336,15 @@ private void validateQueueHierarchy(Map queues,
throw new IOException(queueName + " is moved from:"
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
+ } else if (oldQueue instanceof ParentQueue && !(oldQueue instanceof ManagedParentQueue) && newQueue
+ instanceof ManagedParentQueue) {
+ throw new IOException("Can not convert parent queue: "
+ + oldQueue.getQueuePath() + " to auto create enabled parent queue since "
+ + "it could have other pre-configured queues which is not supported");
+ } else if (oldQueue instanceof ManagedParentQueue && !(newQueue
+ instanceof ManagedParentQueue) && !(newQueue instanceof LeafQueue)) {
+ throw new IOException("Can not convert auto create enabled parent queue: "
+ + oldQueue.getQueuePath() + " to parent queue with pre-configured queues");
} else if (oldQueue instanceof LeafQueue
&& newQueue instanceof ParentQueue) {
if (oldQueue.getState() == QueueState.STOPPED) {
@@ -352,6 +374,7 @@ private void validateQueueHierarchy(Map queues,
*/
private void updateQueues(Map existingQueues,
Map newQueues) {
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
for (Map.Entry e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
@@ -363,7 +386,12 @@ private void updateQueues(Map existingQueues,
.iterator(); itr.hasNext();) {
Map.Entry e = itr.next();
String queueName = e.getKey();
- if (!newQueues.containsKey(queueName)) {
+ CSQueue existingQueue = e.getValue();
+
+ //TODO - Handle case when auto create is disabled on parent queues
+ if (!newQueues.containsKey(queueName)
+ && !(existingQueue instanceof AutoCreatedLeafQueue
+ && conf.isAutoCreateChildQueueEnabled(existingQueue.getParent().getQueuePath()))) {
itr.remove();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f24e30aa1ee..1a606dc7022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1997,6 +1997,10 @@ public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
+ public void setMaxApplicationsPerUser(int maxApplications) {
+ this.maxApplicationsPerUser = maxApplications;
+ }
+
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
new file mode 100644
index 00000000000..1b3f188163e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Auto Creation enabled Parent queue. This queue initially does not have any children to start with and all child
+ * leaf queues will be auto created. Currently this does not allow other pre-configured leaf or parent queues to
+ * co-exist along with auto-created leaf queues. The auto creation is limited to leaf queues currently.
+ */
+public class ManagedParentQueue extends AbstractManagedParentQueue {
+
+ private float autoCreatedLeafQueueCapacity = 0.0f;
+ private float autoCreatedLeafQueueMaxCapacity = 0.0f;
+
+ private float autoCreatedLeafQueueAbsoluteCapacity = 0.0f;
+ private float autoCreatedLeafQueueAbsoluteMaxCapacity = 0.0f;
+
+ private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ManagedParentQueue.class);
+
+ public ManagedParentQueue(final CapacitySchedulerContext cs, final String queueName, final CSQueue parent, final CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
+ }
+
+ @Override
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ validate(newlyParsedQueue);
+ super.reinitialize(newlyParsedQueue, clusterResource);
+ }
+
+ @Override
+ protected void initializeLeafQueueConfigs() {
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ float autoCreatedLeafQueueMaxCapacity = conf.getAutoCreatedLeafQueueTemplateMaxCapacity
+ (getQueuePath()) / 100;
+ float autoCreatedLeafQueueCapacity = conf.getAutoCreatedLeafQueueTemplateCapacity(getQueuePath()) / 100;
+
+ this.autoCreatedLeafQueueCapacity = autoCreatedLeafQueueCapacity;
+ this.autoCreatedLeafQueueMaxCapacity = autoCreatedLeafQueueMaxCapacity;
+ autoCreatedLeafQueueAbsoluteCapacity =
+ autoCreatedLeafQueueCapacity * getAbsoluteCapacity();
+ autoCreatedLeafQueueAbsoluteMaxCapacity =
+ autoCreatedLeafQueueMaxCapacity * getAbsoluteMaximumCapacity();
+
+ this.shouldFailAutoCreationWhenGuaranteedCapacityExceeded = conf
+ .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(getQueuePath());
+
+ super.initializeLeafQueueConfigs();
+ }
+
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
+ // Sanity check
+ if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
+ }
+
+ @Override
+ public void addChildQueue(CSQueue childQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+
+ if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
+ throw new SchedulerDynamicEditException("Expected child queue to be an instance of AutoCreatedLeafQueue");
+ }
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ ManagedParentQueue parentQueue =
+ (ManagedParentQueue) childQueue.getParent();
+
+ String leafQueueName = childQueue.getQueueName();
+ int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
+ parentQueue.getQueuePath());
+
+ if (parentQueue.getChildQueues().size() >= maxQueues) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ".Max Child "
+ + "Queue limit exceeded which is configured as : " + maxQueues
+ + " and number of child queues is : " + parentQueue
+ .getChildQueues().size());
+ }
+
+ if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
+ if (parentQueue.getAutoCreatedLeafQueueAbsoluteCapacity() + parentQueue
+ .sumOfChildAbsCapacities() > parentQueue.getAbsoluteCapacity()) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ". Child "
+ + "queues capacities have reached parent queue : "
+ + parentQueue.getQueuePath() + " guaranteed capacity");
+ }
+ }
+
+ AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+ super.addChildQueue(leafQueue);
+ //TODO - refresh policy queue after capacity management is added
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public float getAutoCreatedLeafQueueCapacity() {
+ return autoCreatedLeafQueueCapacity;
+ }
+
+ public float getAutoCreatedLeafQueueMaxCapacity() {
+ return autoCreatedLeafQueueMaxCapacity;
+ }
+
+ public float getAutoCreatedLeafQueueAbsoluteCapacity() {
+ return autoCreatedLeafQueueAbsoluteCapacity;
+ }
+
+ public float getAutoCreatedLeafQueueAbsoluteMaxCapacity() {
+ return autoCreatedLeafQueueAbsoluteMaxCapacity;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 6800b74f8d4..36d7c0c6a98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -1080,4 +1080,30 @@ public void stopQueue() {
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
+
+ protected float sumOfChildCapacities() {
+ try {
+ writeLock.lock();
+ float ret = 0;
+ for (CSQueue l : childQueues) {
+ ret += l.getCapacity();
+ }
+ return ret;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected float sumOfChildAbsCapacities() {
+ try {
+ writeLock.lock();
+ float ret = 0;
+ for (CSQueue l : childQueues) {
+ ret += l.getAbsoluteCapacity();
+ }
+ return ret;
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 882262fafcc..d4444105de7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -19,11 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,189 +31,48 @@
* reservations, but functionality wise is a sub-class of ParentQueue
*
*/
-public class PlanQueue extends ParentQueue {
+public class PlanQueue extends AbstractManagedParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
- private int maxAppsForReservation;
- private int maxAppsPerUserForReservation;
- private int userLimit;
- private float userLimitFactor;
- protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
-
- this.schedulerContext = cs;
- // Set the reservation queue attributes for the Plan
- CapacitySchedulerConfiguration conf = cs.getConfiguration();
- String queuePath = super.getQueuePath();
- int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
- showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
- if (maxAppsForReservation < 0) {
- maxAppsForReservation =
- (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
- .getAbsoluteCapacity());
- }
- int userLimit = conf.getUserLimit(queuePath);
- float userLimitFactor = conf.getUserLimitFactor(queuePath);
- int maxAppsPerUserForReservation =
- (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
- updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
- maxAppsPerUserForReservation);
-
- StringBuffer queueInfo = new StringBuffer();
- queueInfo.append("Created Plan Queue: ").append(queueName)
- .append("\nwith capacity: [").append(super.getCapacity())
- .append("]\nwith max capacity: [").append(super.getMaximumCapacity())
- .append("\nwith max reservation apps: [").append(maxAppsForReservation)
- .append("]\nwith max reservation apps per user: [")
- .append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
- .append(userLimit).append("]\nwith user limit factor: [")
- .append(userLimitFactor).append("].");
- LOG.info(queueInfo.toString());
}
@Override
- public void reinitialize(CSQueue newlyParsedQueue,
- Resource clusterResource) throws IOException {
- try {
- writeLock.lock();
- // Sanity check
- if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
- .getQueuePath().equals(getQueuePath())) {
- throw new IOException(
- "Trying to reinitialize " + getQueuePath() + " from "
- + newlyParsedQueue.getQueuePath());
- }
-
- PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
-
- if (newlyParsedParentQueue.getChildQueues().size() != 1) {
- throw new IOException(
- "Reservable Queue should not have sub-queues in the"
- + "configuration expect the default reservation queue");
- }
-
- // Set new configs
- setupQueueConfigs(clusterResource);
-
- updateQuotas(newlyParsedParentQueue.userLimit,
- newlyParsedParentQueue.userLimitFactor,
- newlyParsedParentQueue.maxAppsForReservation,
- newlyParsedParentQueue.maxAppsPerUserForReservation);
-
- // run reinitialize on each existing queue, to trigger absolute cap
- // recomputations
- for (CSQueue res : this.getChildQueues()) {
- res.reinitialize(res, clusterResource);
- }
- showReservationsAsQueues =
- newlyParsedParentQueue.showReservationsAsQueues;
- } finally {
- writeLock.unlock();
- }
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ validate(newlyParsedQueue);
+ super.reinitialize(newlyParsedQueue, clusterResource);
}
- void addChildQueue(CSQueue newQueue)
- throws SchedulerDynamicEditException {
- try {
- writeLock.lock();
- if (newQueue.getCapacity() > 0) {
- throw new SchedulerDynamicEditException(
- "Queue " + newQueue + " being added has non zero capacity.");
- }
- boolean added = this.childQueues.add(newQueue);
- if (LOG.isDebugEnabled()) {
- LOG.debug("updateChildQueues (action: add queue): " + added + " "
- + getChildQueuesToPrint());
- }
- } finally {
- writeLock.unlock();
- }
+ @Override
+ protected void initializeLeafQueueConfigs() {
+ String queuePath = super.getQueuePath();
+ showReservationsAsQueues = csContext.getConfiguration()
+ .getShowReservationAsQueues(queuePath);
+ super.initializeLeafQueueConfigs();
}
- void removeChildQueue(CSQueue remQueue)
- throws SchedulerDynamicEditException {
- try {
- writeLock.lock();
- if (remQueue.getCapacity() > 0) {
- throw new SchedulerDynamicEditException(
- "Queue " + remQueue + " being removed has non zero capacity.");
- }
- Iterator qiter = childQueues.iterator();
- while (qiter.hasNext()) {
- CSQueue cs = qiter.next();
- if (cs.equals(remQueue)) {
- qiter.remove();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed child queue: {}", cs.getQueueName());
- }
- }
- }
- } finally {
- writeLock.unlock();
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
+ // Sanity check
+ if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
}
- }
- protected float sumOfChildCapacities() {
- try {
- writeLock.lock();
- float ret = 0;
- for (CSQueue l : childQueues) {
- ret += l.getCapacity();
- }
- return ret;
- } finally {
- writeLock.unlock();
- }
- }
+ PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
- private void updateQuotas(int userLimit, float userLimitFactor,
- int maxAppsForReservation, int maxAppsPerUserForReservation) {
- this.userLimit = userLimit;
- this.userLimitFactor = userLimitFactor;
- this.maxAppsForReservation = maxAppsForReservation;
- this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
- }
-
- /**
- * Number of maximum applications for each of the reservations in this Plan.
- *
- * @return maxAppsForreservation
- */
- public int getMaxApplicationsForReservations() {
- return maxAppsForReservation;
- }
-
- /**
- * Number of maximum applications per user for each of the reservations in
- * this Plan.
- *
- * @return maxAppsPerUserForreservation
- */
- public int getMaxApplicationsPerUserForReservation() {
- return maxAppsPerUserForReservation;
- }
-
- /**
- * User limit value for each of the reservations in this Plan.
- *
- * @return userLimit
- */
- public int getUserLimitForReservation() {
- return userLimit;
- }
-
- /**
- * User limit factor value for each of the reservations in this Plan.
- *
- * @return userLimitFactor
- */
- public float getUserLimitFactor() {
- return userLimitFactor;
+ if (newlyParsedParentQueue.getChildQueues().size() != 1) {
+ throw new IOException(
+ "Reservable Queue should not have sub-queues in the"
+ + "configuration expect the default reservation queue");
+ }
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 399df02465e..10b9ced2f28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -243,6 +244,11 @@ public boolean isAppInCompletedStates() {
throw new UnsupportedOperationException("Not supported yet.");
}
+ @Override
+ public ApplicationPlacementContext getApplicationQueuePlacementContext() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
@Override
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
index 61bc8d9be2c..a6cdeb61f39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java
@@ -58,8 +58,8 @@ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
ApplicationSubmissionContext asc =
Records.newRecord(ApplicationSubmissionContext.class);
asc.setQueue(inputQueue);
- String queue = rule.getQueueForApp(asc, inputUser);
- Assert.assertEquals(expectedQueue, queue);
+ ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser);
+ Assert.assertEquals(expectedQueue, ctx != null ? ctx.getQueue() : inputQueue);
}
@Test
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 39a7f995ab6..ad6c584b04a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -327,6 +328,11 @@ public boolean isAppInCompletedStates() {
return false;
}
+ @Override
+ public ApplicationPlacementContext getApplicationQueuePlacementContext() {
+ return null;
+ }
+
@Override
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
similarity index 70%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
index e23e93c99dd..b403e724533 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
@@ -36,15 +36,19 @@
import org.junit.Before;
import org.junit.Test;
-public class TestReservationQueue {
+/**
+ * Test class for dynamic auto created leaf queues.
+ * @see AutoCreatedLeafQueue
+ */
+public class TestAutoCreatedLeafQueue {
- CapacitySchedulerConfiguration csConf;
- CapacitySchedulerContext csContext;
+ private CapacitySchedulerConfiguration csConf;
+ private CapacitySchedulerContext csContext;
final static int DEF_MAX_APPS = 10000;
final static int GB = 1024;
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
- ReservationQueue reservationQueue;
+ private AutoCreatedLeafQueue autoCreatedLeafQueue;
@Before
public void setup() throws IOException {
@@ -61,49 +65,48 @@ public void setup() throws IOException {
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-
RMContext mockRMContext = TestUtils.getMockRMContext();
when(csContext.getRMContext()).thenReturn(mockRMContext);
// create a queue
PlanQueue pq = new PlanQueue(csContext, "root", null, null);
- reservationQueue = new ReservationQueue(csContext, "a", pq);
+ autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq);
}
- private void validateReservationQueue(double capacity) {
- assertTrue(" actual capacity: " + reservationQueue.getCapacity(),
- reservationQueue.getCapacity() - capacity < CSQueueUtils.EPSILON);
- assertEquals(reservationQueue.maxApplications, DEF_MAX_APPS);
- assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS);
+ private void validateAutoCreatedLeafQueue(double capacity) {
+ assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(),
+ autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON);
+ assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS);
+ assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS);
}
@Test
public void testAddSubtractCapacity() throws Exception {
// verify that setting, adding, subtracting capacity works
- reservationQueue.setCapacity(1.0F);
- validateReservationQueue(1);
- reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
- validateReservationQueue(0.9);
- reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f));
- validateReservationQueue(1);
- reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f));
- validateReservationQueue(0);
+ autoCreatedLeafQueue.setCapacity(1.0F);
+ validateAutoCreatedLeafQueue(1);
+ autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
+ validateAutoCreatedLeafQueue(0.9);
+ autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f));
+ validateAutoCreatedLeafQueue(1);
+ autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f));
+ validateAutoCreatedLeafQueue(0);
try {
- reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
+ autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
fail();
} catch (SchedulerDynamicEditException iae) {
// expected
- validateReservationQueue(1);
+ validateAutoCreatedLeafQueue(1);
}
try {
- reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
+ autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
fail();
} catch (SchedulerDynamicEditException iae) {
// expected
- validateReservationQueue(1);
+ validateAutoCreatedLeafQueue(1);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 1dea4eea75f..a19138dd7a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -103,6 +105,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -904,7 +907,7 @@ void checkQueueCapacities(CapacityScheduler cs,
(B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
}
- private void checkQueueCapacity(CSQueue q, float expectedCapacity,
+ void checkQueueCapacity(CSQueue q, float expectedCapacity,
float expectedAbsCapacity, float expectedMaxCapacity,
float expectedAbsMaxCapacity) {
final float epsilon = 1e-5f;
@@ -917,7 +920,7 @@ private void checkQueueCapacity(CSQueue q, float expectedCapacity,
q.getAbsoluteMaximumCapacity(), epsilon);
}
- private CSQueue findQueue(CSQueue root, String queuePath) {
+ CSQueue findQueue(CSQueue root, String queuePath) {
if (root.getQueuePath().equals(queuePath)) {
return root;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
new file mode 100644
index 00000000000..48b106019dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -0,0 +1,697 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for creation and reinitilization of auto created leaf queues
+ * under a ManagedParentQueue.
+ */
+public class TestCapacitySchedulerAutoQueueCreation {
+
+ private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private final int GB = 1024;
+ private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+ new ContainerUpdates();
+
+ private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ private static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ private static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+ private static final String A1 = A + ".a1";
+ private static final String A2 = A + ".a2";
+ private static final String B1 = B + ".b1";
+ private static final String B2 = B + ".b2";
+ private static final String B3 = B + ".b3";
+ private static final String C1 = C + ".c1";
+ private static final String C2 = C + ".c2";
+ private static final String C3 = C + ".c3";
+ private static float A_CAPACITY = 20f;
+ private static float B_CAPACITY = 40f;
+ private static float C_CAPACITY = 20f;
+ private static float D_CAPACITY = 20f;
+ private static float A1_CAPACITY = 30;
+ private static float A2_CAPACITY = 70;
+ private static float B1_CAPACITY = 60f;
+ private static float B2_CAPACITY = 20f;
+ private static float B3_CAPACITY = 20f;
+ private static float C1_CAPACITY = 20f;
+ private static float C2_CAPACITY = 20f;
+
+ final String USER = "user_";
+ final String USER0 = USER + 0;
+ final String USER2 = USER + 2;
+ final String PARENT_QUEUE = "c";
+
+ private MockRM mockRM = null;
+
+ private CapacityScheduler cs;
+
+ private final TestCapacityScheduler tcs = new TestCapacityScheduler();
+
+ @Before
+ public void setUp() throws Exception {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ List queuePlacementRules = new ArrayList();
+ queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
+ conf.setQueuePlacementRules(queuePlacementRules);
+
+ setupQueueMappings(conf);
+
+ mockRM = new MockRM(conf);
+ cs = (CapacityScheduler) mockRM.getResourceScheduler();
+ cs.updatePlacementRules();
+ mockRM.start();
+
+ cs.start();
+ }
+
+ private CapacitySchedulerConfiguration setupQueueMappings(
+ CapacitySchedulerConfiguration conf) {
+
+ //set queue mapping
+ List queueMappings =
+ new ArrayList<>();
+ for (int i = 0; i <= 3; i++) {
+ //Set C as parent queue name for auto queue creation
+ UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
+ new UserGroupMappingPlacementRule.QueueMapping(
+ UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
+ USER + i, getQueueMapping(PARENT_QUEUE, USER + i));
+ queueMappings.add(userQueueMapping);
+ }
+
+ conf.setQueueMappings(queueMappings);
+ //override with queue mappings
+ conf.setOverrideWithQueueMappings(true);
+ return conf;
+ }
+
+ /**
+ * @param conf, to be modified
+ * @return, CS configuration which has C as an auto creation enabled parent queue
+ *
+ * root
+ * / \ \ \
+ * a b c d
+ * / \ / | \
+ * a1 a2 b1 b2 b3
+ */
+ private CapacitySchedulerConfiguration setupQueueConfiguration(
+ CapacitySchedulerConfiguration conf) {
+
+ //setup new queues with one of them auto enabled
+ // Define top-level queues
+ // Set childQueue for root
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b", "c", "d" });
+
+ conf.setCapacity(A, A_CAPACITY);
+ conf.setCapacity(B, B_CAPACITY);
+ conf.setCapacity(C, C_CAPACITY);
+ conf.setCapacity(D, D_CAPACITY);
+
+ // Define 2nd-level queues
+ conf.setQueues(A, new String[] { "a1", "a2" });
+ conf.setCapacity(A1, A1_CAPACITY);
+ conf.setUserLimitFactor(A1, 100.0f);
+ conf.setCapacity(A2, A2_CAPACITY);
+ conf.setUserLimitFactor(A2, 100.0f);
+
+ conf.setQueues(B, new String[] { "b1", "b2", "b3" });
+ conf.setCapacity(B1, B1_CAPACITY);
+ conf.setUserLimitFactor(B1, 100.0f);
+ conf.setCapacity(B2, B2_CAPACITY);
+ conf.setUserLimitFactor(B2, 100.0f);
+ conf.setCapacity(B3, B3_CAPACITY);
+ conf.setUserLimitFactor(B3, 100.0f);
+
+ conf.setUserLimitFactor(C, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(C, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
+
+ LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
+
+ conf.setUserLimitFactor(D, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(D, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f);
+
+ LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
+
+ return conf;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mockRM != null) {
+ mockRM.stop();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testAutoCreateLeafQueueCreation() throws Exception {
+
+ try {
+ // submit an app
+ RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0,
+ null, USER0);
+ // check preconditions
+ List appsInC = cs.getAppsInQueue(PARENT_QUEUE);
+ assertEquals(1, appsInC.size());
+ ApplicationPlacementContext placementContext =
+ app.getApplicationQueuePlacementContext();
+ validatePlacementContext(placementContext, USER0, PARENT_QUEUE);
+
+ assertNotNull(cs.getQueue(USER0));
+
+ AutoCreatedLeafQueue autoCreatedLeafQueue =
+ (AutoCreatedLeafQueue) cs.getQueue(USER0);
+ ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue("c");
+ assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
+ validateCapacities(autoCreatedLeafQueue);
+ } finally {
+ cleanupQueue(USER0);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
+
+ try {
+ String host = "127.0.0.1";
+ RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
+ host);
+ cs.handle(new NodeAddedSchedulerEvent(node));
+
+ // submit an app
+ RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0,
+ null, USER0);
+ // check preconditions
+ List appsInC = cs.getAppsInQueue(PARENT_QUEUE);
+ assertEquals(1, appsInC.size());
+ ApplicationPlacementContext placementContext =
+ app.getApplicationQueuePlacementContext();
+ validatePlacementContext(placementContext, USER0, PARENT_QUEUE);
+
+ assertNotNull(cs.getQueue(USER0));
+
+ AutoCreatedLeafQueue autoCreatedLeafQueue =
+ (AutoCreatedLeafQueue) cs.getQueue(USER0);
+ ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue("c");
+ assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
+ validateCapacities(autoCreatedLeafQueue);
+
+ ApplicationAttemptId appAttemptId = appsInC.get(0);
+
+ Priority priority = TestUtils.createMockPriority(1);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(
+ null);
+ ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY,
+ 1 * GB, 1, true, priority, recordFactory);
+ cs.allocate(appAttemptId, Collections.singletonList(r1),
+ Collections.emptyList(), Collections.singletonList(host),
+ null, NULL_UPDATE_REQUESTS);
+
+ //And this will result in container assignment for app1
+ CapacityScheduler.schedule(cs);
+
+ //change state to draining
+ autoCreatedLeafQueue.stopQueue();
+
+ cs.killAllAppsInQueue(USER0);
+
+ mockRM.waitForState(appAttemptId, RMAppAttemptState.KILLED);
+
+ mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED);
+
+ //change state to stopped
+ autoCreatedLeafQueue.stopQueue();
+ assertEquals(QueueState.STOPPED,
+ autoCreatedLeafQueue.getQueueInfo().getQueueState());
+
+ cs.reinitialize(cs.getConf(), mockRM.getRMContext());
+
+ AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
+ USER0);
+ validateCapacities(leafQueue);
+
+ } finally {
+ cleanupQueue(USER0);
+ }
+ }
+
+ @Test
+ public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception {
+
+ MockRM mockRM = setupSchedulerInstance();
+ try {
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
+ CapacitySchedulerConfiguration conf = cs.getConfiguration();
+
+ // Test add one auto created queue dynamically and manually modify capacity
+ AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(cs, "c1", (ManagedParentQueue) cs.getQueue("c"));
+ cs.addQueue(c1);
+ c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f));
+
+ // Test add another auto created queue and use setEntitlement to modify
+ // capacity
+ AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(cs, "c2", (ManagedParentQueue) cs.getQueue("c"));
+ cs.addQueue(c2);
+ cs.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f));
+
+ // Verify all allocations match
+ checkQueueCapacities(cs, C_CAPACITY, D_CAPACITY);
+
+ // Reinitialize and verify all dynamic queued survived
+
+ conf.setCapacity(A, 20f);
+ conf.setCapacity(B, 20f);
+ conf.setCapacity(C, 40f);
+ conf.setCapacity(D, 20f);
+ cs.reinitialize(conf, mockRM.getRMContext());
+
+ checkQueueCapacities(cs, 40f, 20f);
+
+ //chnage parent template configs and reinitialize
+ conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
+ cs.reinitialize(conf, mockRM.getRMContext());
+
+ ManagedParentQueue c = (ManagedParentQueue) cs.getQueue("c");
+ AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(cs, "c3", c);
+ cs.addQueue(c3);
+
+ c3.setEntitlement(
+ new QueueEntitlement(c.getAutoCreatedLeafQueueCapacity(), 1f));
+ cs.reinitialize(conf, mockRM.getRMContext());
+
+ checkQueueCapacities(cs, 40f, 20f);
+ } finally {
+ if ( mockRM != null) {
+ ((CapacityScheduler) mockRM.getResourceScheduler()).stop();
+ mockRM.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testReinitializeFailsWithAutoCreateDisabledOnManagedParentQueue()
+ throws Exception {
+ CapacityScheduler newCS = new CapacityScheduler();
+ try {
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(newConf);
+
+ //TODO - should we allow this and retain all auto created leaf queues
+ // as normal leaf queues?
+ // what should be the behaviour in phase 1?
+ newConf.setAutoCreateChildQueueEnabled(C, false);
+
+ newCS.setConf(new YarnConfiguration());
+ newCS.setRMContext(mockRM.getRMContext());
+ newCS.init(cs.getConf());
+ newCS.start();
+
+ newCS.reinitialize(newConf,
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(newConf),
+ new NMTokenSecretManagerInRM(newConf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+
+ fail(
+ "Expected exception while converting a auto create enabled parent queue to a parent queue");
+ } catch (IOException e) {
+ //expected exception
+ } finally {
+ newCS.stop();
+ }
+ }
+
+ @Test
+ public void testConvertLeafQueueToParentQueueWithAutoCreate()
+ throws Exception {
+ CapacityScheduler newCS = new CapacityScheduler();
+ try {
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(newConf);
+ newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10);
+ newConf.setAutoCreateChildQueueEnabled(A1, true);
+
+ newCS.setConf(new YarnConfiguration());
+ newCS.setRMContext(mockRM.getRMContext());
+ newCS.init(cs.getConf());
+ newCS.start();
+
+ final LeafQueue a1Queue = (LeafQueue) newCS.getQueue("a1");
+ a1Queue.stopQueue();
+
+ newCS.reinitialize(newConf,
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(newConf),
+ new NMTokenSecretManagerInRM(newConf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+
+ } finally {
+ newCS.stop();
+ }
+ }
+
+ @Test
+ public void testConvertFailsFromParentQueueToManagedParentQueue()
+ throws Exception {
+ CapacityScheduler newCS = new CapacityScheduler();
+ try {
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(newConf);
+ newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10);
+ newConf.setAutoCreateChildQueueEnabled(A, true);
+
+ newCS.setConf(new YarnConfiguration());
+ newCS.setRMContext(mockRM.getRMContext());
+ newCS.init(cs.getConf());
+ newCS.start();
+
+ final ParentQueue a1Queue = (ParentQueue) newCS.getQueue("a");
+ a1Queue.stopQueue();
+
+ newCS.reinitialize(newConf,
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(newConf),
+ new NMTokenSecretManagerInRM(newConf),
+ new ClientToAMTokenSecretManagerInRM(), null));
+
+ fail(
+ "Expected exception while converting a parent queue to an auto create enabled parent queue");
+ } catch (IOException e) {
+ //expected exception
+ } finally {
+ newCS.stop();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testAutoCreateLeafQueueFailsWithNoQueueMapping()
+ throws Exception {
+
+ final String INVALID_USER = "invalid_user";
+
+ // submit an app under a different queue name which does not exist and queue mapping does not exist for this user
+ RMApp app = mockRM.submitApp(GB, "app", INVALID_USER, null, INVALID_USER,
+ false);
+ mockRM.drainEvents();
+ mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED);
+ assertEquals(RMAppState.FAILED, app.getState());
+ }
+
+ private void validatePlacementContext(
+ ApplicationPlacementContext placementContext, String source,
+ String parentQueue) {
+ assertNotNull(placementContext);
+ assertEquals(placementContext.getParentQueue(), parentQueue);
+ assertEquals(placementContext.getQueue(), source);
+
+ assertEquals(placementContext.getRule().getName(),
+ UserGroupMappingPlacementRule.class.getName());
+ }
+
+ private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) {
+ assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON);
+ assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON);
+ assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON);
+ assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f,
+ EPSILON);
+ int maxAppsForAutoCreatedQueues = (int) (
+ CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
+ * autoCreatedLeafQueue.getParent().getAbsoluteCapacity());
+ assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
+ maxAppsForAutoCreatedQueues);
+ assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(),
+ (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration()
+ .getUserLimitFactor(
+ autoCreatedLeafQueue.getParent().getQueuePath()))));
+ }
+
+ private void cleanupQueue(String queueName) throws YarnException {
+ AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName);
+ if (queue != null) {
+ queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f));
+ ((ManagedParentQueue) queue.getParent()).removeChildQueue(
+ queue.getQueueName());
+ cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName());
+ } else{
+ throw new YarnException("Queue does not exist " + queueName);
+ }
+ }
+
+ private ApplicationId submitApp(CSQueue parentQueue, String leafQueueName,
+ String user, int expectedNumAppsInParentQueue,
+ int expectedNumAppsInLeafQueue) throws Exception {
+ // submit an app
+ RMApp rmApp = mockRM.submitApp(GB, "test-auto-queue-activation", user, null,
+ leafQueueName);
+
+ // check preconditions
+ List appsInParentQueue = cs.getAppsInQueue(
+ parentQueue.getQueueName());
+ assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size());
+
+ List appsInLeafQueue = cs.getAppsInQueue(
+ leafQueueName);
+ assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size());
+
+ return rmApp.getApplicationId();
+ }
+
+ String getQueueMapping(String parentQueue, String leafQueue) {
+ return parentQueue + DOT + leafQueue;
+ }
+
+ @Test(timeout = 10000)
+ public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
+ throws Exception {
+
+ MockRM mockRM = setupSchedulerInstance();
+ try {
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
+
+ //"a" is not auto create enabled
+
+ //dynamic queue mapping
+ try {
+ setupQueueMapping(cs, CURRENT_USER_MAPPING, "a", CURRENT_USER_MAPPING);
+ cs.updatePlacementRules();
+ fail("Expected invalid parent queue mapping failure");
+
+ } catch (IOException e) {
+ //expected exception
+ assertTrue(e.getMessage().contains(
+ "invalid parent queue which does not have auto creation of leaf queues enabled ["
+ + "a" + "]"));
+ }
+
+ //"a" is not auto create enabled and app_user does not exist as a leaf queue
+ //static queue mapping
+ try {
+ setupQueueMapping(cs, "app_user", "INVALID_PARENT_QUEUE", "app_user");
+ cs.updatePlacementRules();
+ fail("Expected invalid parent queue mapping failure");
+ } catch (IOException e) {
+ //expected exception
+ assertTrue(e.getMessage()
+ .contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]"));
+ }
+ } finally {
+ if ( mockRM != null) {
+ ((CapacityScheduler) mockRM.getResourceScheduler()).stop();
+ mockRM.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping()
+ throws Exception {
+
+ MockRM mockRM = setupSchedulerInstance();
+ try {
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
+
+ setupQueueMapping(cs, CURRENT_USER_MAPPING, "c", CURRENT_USER_MAPPING);
+ cs.updatePlacementRules();
+
+ try {
+ setupQueueMapping(cs, CURRENT_USER_MAPPING, "", CURRENT_USER_MAPPING);
+ cs.updatePlacementRules();
+ fail("Expected invalid parent queue mapping failure");
+ } catch (IOException e) {
+ //expected exception
+ assertTrue(e.getMessage().contains("invalid parent queue []"));
+ }
+ } finally {
+ if (mockRM != null) {
+ ((CapacityScheduler) mockRM.getResourceScheduler()).stop();
+ mockRM.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testParentQueueUpdateInQueueMappingFailsAfterAutoCreation()
+ throws Exception {
+
+ MockRM mockRM = setupSchedulerInstance();
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
+ try {
+ mockRM.start();
+ cs.start();
+
+ RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, null,
+ USER0);
+
+ assertNotNull(cs.getQueue(USER0));
+
+ setupQueueMapping(cs, USER0, "d", USER0);
+ cs.updatePlacementRules();
+
+ app = mockRM.submitApp(GB, "test-auto-queue-creation-2", USER0, null,
+ USER0, false);
+ mockRM.drainEvents();
+ mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED);
+ assertEquals(RMAppState.FAILED, app.getState());
+ } finally {
+ if ( mockRM != null) {
+ ((CapacityScheduler) mockRM.getResourceScheduler()).stop();
+ mockRM.stop();
+ }
+ }
+ }
+
+ private List setupQueueMapping(
+ CapacityScheduler cs, String user, String parentQueue, String queue) {
+ List queueMappings =
+ new ArrayList<>();
+ queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping(
+ UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user,
+ getQueueMapping(parentQueue, queue)));
+ cs.getConfiguration().setQueueMappings(queueMappings);
+ return queueMappings;
+ }
+
+ private MockRM setupSchedulerInstance() {
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(conf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ List queuePlacementRules = new ArrayList();
+ queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
+ conf.setQueuePlacementRules(queuePlacementRules);
+
+ setupQueueMappings(conf);
+
+ MockRM mockRM = new MockRM(conf);
+ return mockRM;
+ }
+
+ void checkQueueCapacities(CapacityScheduler cs,
+ float capacityC, float capacityD) {
+ CSQueue rootQueue = cs.getRootQueue();
+ CSQueue queueC = tcs.findQueue(rootQueue, C);
+ CSQueue queueD = tcs.findQueue(rootQueue, D);
+ CSQueue queueC1 = tcs.findQueue(queueC, C1);
+ CSQueue queueC2 = tcs.findQueue(queueC, C2);
+ CSQueue queueC3 = tcs.findQueue(queueC, C3);
+
+ float capC = capacityC / 100.0f;
+ float capD = capacityD / 100.0f;
+
+ tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f);
+ tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f);
+ tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f,
+ (C1_CAPACITY/100.0f) * capC, 1.0f, 1.0f);
+ tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f,
+ (C2_CAPACITY/100.0f) * capC, 1.0f, 1.0f);
+
+ if ( queueC3 != null ) {
+ float leafQueueTemplateCapacity = cs.getConfiguration()
+ .getAutoCreatedLeafQueueTemplateCapacity(C);
+ tcs.checkQueueCapacity(queueC3,
+ leafQueueTemplateCapacity/ 100.0f,
+ (leafQueueTemplateCapacity / 100.0f) * capC, 1.0f, 1.0f);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
index 9aba30c2e88..9425d5ea89b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
@@ -77,21 +77,21 @@ public void testRefreshQueuesWithReservations() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
//set default queue capacity to zero
- ((ReservationQueue) cs
+ ((AutoCreatedLeafQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
// Test add one reservation dynamically and manually modify capacity
- ReservationQueue a1 =
- new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a1 =
+ new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
// Test add another reservation queue and use setEntitlement to modify
// capacity
- ReservationQueue a2 =
- new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a2 =
+ new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2);
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
@@ -113,8 +113,8 @@ public void testAddQueueFailCases() throws Exception {
try {
// Test invalid addition (adding non-zero size queue)
- ReservationQueue a1 =
- new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a1 =
+ new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
cs.addQueue(a1);
fail();
@@ -123,11 +123,11 @@ public void testAddQueueFailCases() throws Exception {
}
// Test add one reservation dynamically and manually modify capacity
- ReservationQueue a1 =
- new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a1 =
+ new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
//set default queue capacity to zero
- ((ReservationQueue) cs
+ ((AutoCreatedLeafQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
@@ -135,8 +135,8 @@ public void testAddQueueFailCases() throws Exception {
// Test add another reservation queue and use setEntitlement to modify
// capacity
- ReservationQueue a2 =
- new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a2 =
+ new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2);
@@ -162,8 +162,8 @@ public void testRemoveQueue() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Test add one reservation dynamically and manually modify capacity
- ReservationQueue a1 =
- new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
+ AutoCreatedLeafQueue a1 =
+ new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
@@ -230,8 +230,8 @@ public void testMoveAppToPlanQueue() throws Exception {
// create the default reservation queue
String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
- ReservationQueue defQ =
- new ReservationQueue(scheduler, defQName,
+ AutoCreatedLeafQueue defQ =
+ new AutoCreatedLeafQueue(scheduler, defQName,
(PlanQueue) scheduler.getQueue("a"));
scheduler.addQueue(defQ);
defQ.setEntitlement(new QueueEntitlement(1f, 1f));