diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index d0425907f6a..5e82f401b6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -360,13 +360,8 @@ protected void recoverApplication(ApplicationStateData appState,
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException {
+
if (!isRecovery) {
- // Do queue mapping
- if (rmContext.getQueuePlacementManager() != null) {
- // We only do queue mapping when it's a new application
- rmContext.getQueuePlacementManager().placeApplication(
- submissionContext, user);
- }
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
new file mode 100644
index 00000000000..f2f92b81fbc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+/**
+ * Each placement rule when it successfully places an application onto a queue
+ * returns a PlacementRuleContext which encapsulates the queue the
+ * application was mapped to and any parent queue for the queue (if configured)
+ */
+public class ApplicationPlacementContext {
+
+ private String queue;
+
+ private String parentQueue;
+
+ public ApplicationPlacementContext(String queue) {
+ this(queue,null);
+ }
+
+ public ApplicationPlacementContext(String queue, String parentQueue) {
+ this.queue = queue;
+ this.parentQueue = parentQueue;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public String getParentQueue() {
+ return parentQueue;
+ }
+
+ public boolean hasParentQueue() {
+ return parentQueue != null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index 43a4deb70eb..c0067385b95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -23,7 +23,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,36 +52,33 @@ public void updateRules(List
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Auto Creation enabled Parent queue. This queue initially does not have any
+ * children to start with and all child
+ * leaf queues will be auto created. Currently this does not allow other
+ * pre-configured leaf or parent queues to
+ * co-exist along with auto-created leaf queues. The auto creation is limited
+ * to leaf queues currently.
+ */
+public class ManagedParentQueue extends AbstractManagedParentQueue {
+
+ private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ManagedParentQueue.class);
+
+ public ManagedParentQueue(final CapacitySchedulerContext cs,
+ final String queueName, final CSQueue parent, final CSQueue old)
+ throws IOException {
+ super(cs, queueName, parent, old);
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+ csContext.getConfiguration());
+ this.leafQueueTemplate = initializeLeafQueueConfigs(
+ leafQueueTemplateConfPrefix).build();
+
+ StringBuffer queueInfo = new StringBuffer();
+ queueInfo.append("Created Managed Parent Queue: ").append(queueName).append(
+ "]\nwith capacity: [").append(super.getCapacity()).append(
+ "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+ "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+ "]\nwith max apps per user: [").append(
+ leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+ .append(leafQueueTemplate.getUserLimit()).append(
+ "]\nwith user limit factor: [").append(
+ leafQueueTemplate.getUserLimitFactor()).append("].");
+ LOG.info(queueInfo.toString());
+ }
+
+ @Override
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ validate(newlyParsedQueue);
+ super.reinitialize(newlyParsedQueue, clusterResource);
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+ csContext.getConfiguration());
+ this.leafQueueTemplate = initializeLeafQueueConfigs(
+ leafQueueTemplateConfPrefix).build();
+ }
+
+ @Override
+ protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs(
+ String queuePath) {
+
+ AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate =
+ super.initializeLeafQueueConfigs(queuePath);
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf);
+ QueueCapacities queueCapacities = new QueueCapacities(false);
+ CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix,
+ csContext.getConfiguration(), queueCapacities, getQueueCapacities());
+ leafQueueTemplate.capacities(queueCapacities);
+
+ shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
+ conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+ getQueuePath());
+
+ return leafQueueTemplate;
+ }
+
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
+ // Sanity check
+ if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
+ }
+
+ @Override
+ public void addChildQueue(CSQueue childQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+
+ if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
+ throw new SchedulerDynamicEditException(
+ "Expected child queue to be an instance of AutoCreatedLeafQueue");
+ }
+
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ ManagedParentQueue parentQueue =
+ (ManagedParentQueue) childQueue.getParent();
+
+ String leafQueueName = childQueue.getQueueName();
+ int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
+ parentQueue.getQueuePath());
+
+ if (parentQueue.getChildQueues().size() >= maxQueues) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ".Max Child "
+ + "Queue limit exceeded which is configured as : " + maxQueues
+ + " and number of child queues is : " + parentQueue
+ .getChildQueues().size());
+ }
+
+ if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
+ if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+ + parentQueue.sumOfChildAbsCapacities() > parentQueue
+ .getAbsoluteCapacity()) {
+ throw new SchedulerDynamicEditException(
+ "Cannot auto create leaf queue " + leafQueueName + ". Child "
+ + "queues capacities have reached parent queue : "
+ + parentQueue.getQueuePath() + " guaranteed capacity");
+ }
+ }
+
+ AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+ super.addChildQueue(leafQueue);
+ //TODO - refresh policy queue after capacity management is added
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
+ return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index d61951bb8f7..959ca51eb2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -1081,17 +1081,4 @@ public void stopQueue() {
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
-
- protected float sumOfChildCapacities() {
- try {
- writeLock.lock();
- float ret = 0;
- for (CSQueue l : childQueues) {
- ret += l.getCapacity();
- }
- return ret;
- } finally {
- writeLock.unlock();
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 4ab2e9f14df..b7f8aa6996b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -40,6 +40,19 @@
public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
+ this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
+
+ StringBuffer queueInfo = new StringBuffer();
+ queueInfo.append("Created Plan Queue: ").append(queueName).append(
+ "]\nwith capacity: [").append(super.getCapacity()).append(
+ "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+ "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+ "]\nwith max apps per user: [").append(
+ leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+ .append(leafQueueTemplate.getUserLimit()).append(
+ "]\nwith user limit factor: [").append(
+ leafQueueTemplate.getUserLimitFactor()).append("].");
+ LOG.info(queueInfo.toString());
}
@Override
@@ -47,17 +60,21 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
validate(newlyParsedQueue);
super.reinitialize(newlyParsedQueue, clusterResource);
+ this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
}
@Override
- protected void initializeLeafQueueConfigs() {
- String queuePath = super.getQueuePath();
+ protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
+ (String queuePath) {
+ AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super
+ .initializeLeafQueueConfigs
+ (queuePath);
showReservationsAsQueues = csContext.getConfiguration()
.getShowReservationAsQueues(queuePath);
- super.initializeLeafQueueConfigs();
+ return leafQueueTemplate;
}
- private void validate(final CSQueue newlyParsedQueue) throws IOException {
+ protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
index 0a8d6fe529b..620969e761a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
public class AppAddedSchedulerEvent extends SchedulerEvent {
@@ -31,15 +32,22 @@
private final ReservationId reservationID;
private final boolean isAppRecovering;
private final Priority appPriority;
+ private final ApplicationPlacementContext placementContext;
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user) {
- this(applicationId, queue, user, false, null, Priority.newInstance(0));
+ this(applicationId, queue, user, false, null, Priority.newInstance(0),
+ null);
+ }
+
+ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+ String user, ApplicationPlacementContext placementContext) {
+ this(applicationId, queue, user, false, null, Priority.newInstance(0), placementContext);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, ReservationId reservationID, Priority appPriority) {
- this(applicationId, queue, user, false, reservationID, appPriority);
+ this(applicationId, queue, user, false, reservationID, appPriority, null);
}
public AppAddedSchedulerEvent(String user,
@@ -47,12 +55,20 @@ public AppAddedSchedulerEvent(String user,
Priority appPriority) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(),
- appPriority);
+ appPriority, null);
+ }
+
+ public AppAddedSchedulerEvent(String user,
+ ApplicationSubmissionContext submissionContext, boolean isAppRecovering,
+ Priority appPriority, ApplicationPlacementContext placementContext) {
+ this(submissionContext.getApplicationId(), submissionContext.getQueue(),
+ user, isAppRecovering, submissionContext.getReservationID(),
+ appPriority, placementContext);
}
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID,
- Priority appPriority) {
+ Priority appPriority, ApplicationPlacementContext placementContext) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
@@ -60,6 +76,7 @@ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
this.appPriority = appPriority;
+ this.placementContext = placementContext;
}
public ApplicationId getApplicationId() {
@@ -85,4 +102,8 @@ public ReservationId getReservationID() {
public Priority getApplicatonPriority() {
return appPriority;
}
+
+ public ApplicationPlacementContext getPlacementContext() {
+ return placementContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 81793217c4c..ba948c8fd47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.MockApps;
@@ -70,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -84,7 +84,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -856,28 +855,33 @@ public void testEscapeApplicationSummary() {
Assert.assertTrue(msg.contains("preemptedResources=
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for creation and reinitilization of auto created leaf queues
+ * under a ManagedParentQueue.
+ */
+public class TestCapacitySchedulerAutoQueueCreation {
+
+ private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
+ private final int GB = 1024;
+ private final static ContainerUpdates NULL_UPDATE_REQUESTS =
+ new ContainerUpdates();
+
+ private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ private static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ private static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
+ private static final String A1 = A + ".a1";
+ private static final String A2 = A + ".a2";
+ private static final String B1 = B + ".b1";
+ private static final String B2 = B + ".b2";
+ private static final String B3 = B + ".b3";
+ private static final String C1 = C + ".c1";
+ private static final String C2 = C + ".c2";
+ private static final String C3 = C + ".c3";
+ private static float A_CAPACITY = 20f;
+ private static float B_CAPACITY = 40f;
+ private static float C_CAPACITY = 20f;
+ private static float D_CAPACITY = 20f;
+ private static float A1_CAPACITY = 30;
+ private static float A2_CAPACITY = 70;
+ private static float B1_CAPACITY = 60f;
+ private static float B2_CAPACITY = 20f;
+ private static float B3_CAPACITY = 20f;
+ private static float C1_CAPACITY = 20f;
+ private static float C2_CAPACITY = 20f;
+
+ private static String USER = "user_";
+ private static String USER0 = USER + 0;
+ private static String USER2 = USER + 2;
+ private static String PARENT_QUEUE = "c";
+
+ private MockRM mockRM = null;
+
+ private CapacityScheduler cs;
+
+ private final TestCapacityScheduler tcs = new TestCapacityScheduler();
+
+ private static SpyDispatcher dispatcher;
+
+ private static EventHandler
+ * root
+ * / \ \ \
+ * a b c d
+ * / \ / | \
+ * a1 a2 b1 b2 b3
+ */
+ private CapacitySchedulerConfiguration setupQueueConfiguration(
+ CapacitySchedulerConfiguration conf) {
+
+ //setup new queues with one of them auto enabled
+ // Define top-level queues
+ // Set childQueue for root
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b", "c", "d" });
+
+ conf.setCapacity(A, A_CAPACITY);
+ conf.setCapacity(B, B_CAPACITY);
+ conf.setCapacity(C, C_CAPACITY);
+ conf.setCapacity(D, D_CAPACITY);
+
+ // Define 2nd-level queues
+ conf.setQueues(A, new String[] { "a1", "a2" });
+ conf.setCapacity(A1, A1_CAPACITY);
+ conf.setUserLimitFactor(A1, 100.0f);
+ conf.setCapacity(A2, A2_CAPACITY);
+ conf.setUserLimitFactor(A2, 100.0f);
+
+ conf.setQueues(B, new String[] { "b1", "b2", "b3" });
+ conf.setCapacity(B1, B1_CAPACITY);
+ conf.setUserLimitFactor(B1, 100.0f);
+ conf.setCapacity(B2, B2_CAPACITY);
+ conf.setUserLimitFactor(B2, 100.0f);
+ conf.setCapacity(B3, B3_CAPACITY);
+ conf.setUserLimitFactor(B3, 100.0f);
+
+ conf.setUserLimitFactor(C, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(C, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f);
+
+ LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
+
+ conf.setUserLimitFactor(D, 1.0f);
+ conf.setAutoCreateChildQueueEnabled(D, true);
+
+ //Setup leaf queue template configs
+ conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f);
+ conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f);
+
+ LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
+
+ return conf;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mockRM != null) {
+ mockRM.stop();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testAutoCreateLeafQueueCreation() throws Exception {
+
+ try {
+ // submit an app
+ submitApp(cs, USER0, USER0, PARENT_QUEUE);
+
+ // check preconditions
+ List