diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java index 9794acd34f1..548cfe807be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java @@ -46,6 +46,8 @@ protected float capacity; protected float usedCapacity; protected float maxCapacity; + protected float weight; + protected float normalizedWeight; protected String queueName; protected CapacitySchedulerQueueInfoList queues; protected QueueCapacitiesInfo capacities; @@ -55,6 +57,7 @@ protected int queuePriority; protected String orderingPolicyInfo; protected String mode; + protected String queueType; @XmlTransient static final float EPSILON = 1e-8f; @@ -70,6 +73,8 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) { if (max < EPSILON || max > 1f) max = 1f; this.maxCapacity = max * 100; + this.weight = parent.getQueueCapacities().getWeight(); + this.normalizedWeight = parent.getQueueCapacities().getNormalizedWeight(); capacities = new QueueCapacitiesInfo(parent.getQueueCapacities(), parent.getQueueResourceQuotas(), false); @@ -101,6 +106,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) { .getConfigName(); } mode = CapacitySchedulerInfoHelper.getMode(parent); + queueType = CapacitySchedulerInfoHelper.getQueueType(parent); } public float getCapacity() { @@ -147,7 +153,7 @@ protected CapacitySchedulerQueueInfoList getQueues( CapacityScheduler cs, CSQueue parent) { CapacitySchedulerQueueInfoList queuesInfo = new CapacitySchedulerQueueInfoList(); - // JAXB marashalling leads to situation where the "type" field injected + // JAXB marshalling leads to situation where the "type" field injected // for JSON changes from string to array depending on order of printing // Issue gets fixed if all the leaf queues are marshalled before the // non-leaf queues. See YARN-4785 for more details. @@ -181,4 +187,7 @@ public String getMode() { return mode; } + public String getQueueType() { + return queueType; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java index 6fb0290b1d0..dec666bf949 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java @@ -66,6 +66,8 @@ protected float absoluteCapacity; protected float absoluteMaxCapacity; protected float absoluteUsedCapacity; + protected float weight; + protected float normalizedWeight; protected int numApplications; protected String queueName; protected boolean isAbsoluteResource; @@ -88,6 +90,7 @@ protected boolean autoCreateChildQueueEnabled; protected LeafQueueTemplateInfo leafQueueTemplate; protected String mode; + protected String queueType; CapacitySchedulerQueueInfo() { }; @@ -109,6 +112,8 @@ cap(q.getAbsoluteMaximumCapacity(), 0f, 1f) * 100; absoluteUsedCapacity = cap(q.getAbsoluteUsedCapacity(), 0f, 1f) * 100; + weight = q.getQueueCapacities().getWeight(); + normalizedWeight = q.getQueueCapacities().getNormalizedWeight(); numApplications = q.getNumApplications(); allocatedContainers = q.getMetrics().getAllocatedContainers(); pendingContainers = q.getMetrics().getPendingContainers(); @@ -131,6 +136,7 @@ populateQueueCapacities(qCapacities, qResQuotas); mode = CapacitySchedulerInfoHelper.getMode(q); + queueType = CapacitySchedulerInfoHelper.getQueueType(q); ResourceUsage queueResourceUsage = q.getQueueResourceUsage(); populateQueueResourceUsage(queueResourceUsage); @@ -314,4 +320,16 @@ public LeafQueueTemplateInfo getLeafQueueTemplate() { public String getMode() { return mode; } + + public String getQueueType() { + return queueType; + } + + public float getWeight() { + return weight; + } + + public float getNormalizedWeight() { + return normalizedWeight; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionQueueCapacitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionQueueCapacitiesInfo.java index cc4b565ef36..1b66808356d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionQueueCapacitiesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionQueueCapacitiesInfo.java @@ -39,6 +39,8 @@ private float absoluteUsedCapacity; private float absoluteMaxCapacity = 100; private float maxAMLimitPercentage; + private float weight; + private float normalizedWeight; private ResourceInfo configuredMinResource; private ResourceInfo configuredMaxResource; private ResourceInfo effectiveMinResource; @@ -50,6 +52,7 @@ public PartitionQueueCapacitiesInfo() { public PartitionQueueCapacitiesInfo(String partitionName, float capacity, float usedCapacity, float maxCapacity, float absCapacity, float absUsedCapacity, float absMaxCapacity, float maxAMLimitPercentage, + float weight, float normalizedWeight, Resource confMinRes, Resource confMaxRes, Resource effMinRes, Resource effMaxRes) { super(); @@ -61,6 +64,8 @@ public PartitionQueueCapacitiesInfo(String partitionName, float capacity, this.absoluteUsedCapacity = absUsedCapacity; this.absoluteMaxCapacity = absMaxCapacity; this.maxAMLimitPercentage = maxAMLimitPercentage; + this.weight = weight; + this.normalizedWeight = normalizedWeight; this.configuredMinResource = new ResourceInfo(confMinRes); this.configuredMaxResource = new ResourceInfo(confMaxRes); this.effectiveMinResource = new ResourceInfo(effMinRes); @@ -127,6 +132,22 @@ public float getMaxAMLimitPercentage() { return maxAMLimitPercentage; } + public float getWeight() { + return weight; + } + + public void setWeight(float weight) { + this.weight = weight; + } + + public float getNormalizedWeight() { + return normalizedWeight; + } + + public void setNormalizedWeight(float normalizedWeight) { + this.normalizedWeight = normalizedWeight; + } + public void setMaxAMLimitPercentage(float maxAMLimitPercentage) { this.maxAMLimitPercentage = maxAMLimitPercentage; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueCapacitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueCapacitiesInfo.java index 35c80d2ea4a..3c29f505d8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueCapacitiesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/QueueCapacitiesInfo.java @@ -52,6 +52,8 @@ public QueueCapacitiesInfo(QueueCapacities capacities, float absUsedCapacity; float absMaxCapacity; float maxAMLimitPercentage; + float weight; + float normalizedWeight; for (String partitionName : capacities.getExistingNodeLabels()) { usedCapacity = capacities.getUsedCapacity(partitionName) * 100; capacity = capacities.getCapacity(partitionName) * 100; @@ -67,10 +69,13 @@ public QueueCapacitiesInfo(QueueCapacities capacities, if (maxCapacity < CapacitySchedulerQueueInfo.EPSILON || maxCapacity > 1f) maxCapacity = 1f; maxCapacity = maxCapacity * 100; + weight = capacities.getWeight(partitionName); + normalizedWeight = capacities.getNormalizedWeight(partitionName); queueCapacitiesByPartition.add(new PartitionQueueCapacitiesInfo( partitionName, capacity, usedCapacity, maxCapacity, absCapacity, absUsedCapacity, absMaxCapacity, considerAMUsage ? maxAMLimitPercentage : 0f, + weight, normalizedWeight, resourceQuotas.getConfiguredMinResource(partitionName), resourceQuotas.getConfiguredMaxResource(partitionName), resourceQuotas.getEffectiveMinResource(partitionName), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java index 9727f9ffd66..52a5a5cf2a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java @@ -18,9 +18,17 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; public class CapacitySchedulerInfoHelper { + private static final String AUTO_CREATED_LEAF = "autoCreatedLeaf"; + private static final String STATIC_LEAF = "staticLeaf"; + private static final String AUTO_CREATED_PARENT = "autoCreatedParent"; + private static final String STATIC_PARENT = "staticParent"; + private static final String UNKNOWN_QUEUE = "unknown"; private CapacitySchedulerInfoHelper() {} @@ -41,4 +49,22 @@ public static String getMode(CSQueue queue) throws YarnRuntimeException { throw new YarnRuntimeException("Unknown mode for queue: " + queue.getQueuePath() + ". Queue details: " + queue); } + + public static String getQueueType(CSQueue queue) { + if (queue instanceof LeafQueue) { + if (((AbstractCSQueue)queue).isDynamicQueue()) { + return AUTO_CREATED_LEAF; + } else { + return STATIC_LEAF; + } + } else if (queue instanceof ParentQueue) { + if (((AbstractCSQueue)queue).isDynamicQueue()) { + return AUTO_CREATED_PARENT; + } else { + //A ParentQueue with isDynamic=false or an AbstractManagedParentQueue + return STATIC_PARENT; + } + } + return UNKNOWN_QUEUE; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java index 8ee00296e6a..a2b744b6070 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java @@ -35,11 +35,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerAutoQueueHandler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.JerseyTestBase; @@ -63,8 +68,35 @@ JerseyTestBase { private static final Logger LOG = LoggerFactory.getLogger(TestRMWebServicesCapacitySchedDynamicConfig.class); - - protected static MockRM rm; + private static final float EXP_WEIGHT_NON_WEIGHT_MODE = -1.0F; + private static final float EXP_NORM_WEIGHT_NON_WEIGHT_MODE = 0.0F; + private static final float EXP_ROOT_WEIGHT_IN_WEIGHT_MODE = 1.0F; + private static final float EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE = 1.0F; + private static final double DELTA = 0.00001; + private static final String STATIC_PARENT = "staticParent"; + private static final String STATIC_LEAF = "staticLeaf"; + private static final int GB = 1024; + private static final String AUTO_CREATED_LEAF = "autoCreatedLeaf"; + private static final String AUTO_CREATED_PARENT = "autoCreatedParent"; + protected static MockRM RM; + + private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private CapacitySchedulerConfiguration csConf; + + private static class ExpectedQueueWithProperties { + private String path; + public final float weight; + public final float normalizedWeight; + private String queueType; + + public ExpectedQueueWithProperties(String path, float weight, + float normalizedWeight, String queueType) { + this.path = path; + this.weight = weight; + this.normalizedWeight = normalizedWeight; + this.queueType = queueType; + } + } private static class WebServletModule extends ServletModule { private final Configuration conf; @@ -82,8 +114,8 @@ protected void configureServlets() { ResourceScheduler.class); conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); - rm = new MockRM(conf); - bind(ResourceManager.class).toInstance(rm); + RM = new MockRM(conf); + bind(ResourceManager.class).toInstance(RM); serve("/*").with(GuiceContainer.class); } } @@ -97,13 +129,15 @@ public void setUp() throws Exception { private void initResourceManager(Configuration conf) throws IOException { GuiceServletConfig.setInjector( Guice.createInjector(new WebServletModule(conf))); - rm.start(); + RM.start(); //Need to call reinitialize as //MutableCSConfigurationProvider with InMemoryConfigurationStore //somehow does not load the queues properly and falls back to default config. //Therefore CS will think there's only the default queue there. - ((CapacityScheduler)rm.getResourceScheduler()).reinitialize(conf, - rm.getRMContext(), true); + ((CapacityScheduler) RM.getResourceScheduler()).reinitialize(conf, + RM.getRMContext(), true); + CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler(); + csConf = cs.getConfiguration(); } public TestRMWebServicesCapacitySchedDynamicConfig() { @@ -124,8 +158,19 @@ public void testSchedulerResponsePercentageMode() initResourceManager(config); JSONObject json = sendRequestToSchedulerEndpoint(); - validateSchedulerInfo(json, "percentage", "root.default", "root.test1", - "root.test2"); + validateSchedulerInfo(json, "percentage", + new ExpectedQueueWithProperties("root", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_PARENT), + new ExpectedQueueWithProperties("root.default", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test1", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test2", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF)); } @Test @@ -138,8 +183,19 @@ public void testSchedulerResponseAbsoluteMode() initResourceManager(config); JSONObject json = sendRequestToSchedulerEndpoint(); - validateSchedulerInfo(json, "absolute", "root.default", "root.test1", - "root.test2"); + validateSchedulerInfo(json, "absolute", + new ExpectedQueueWithProperties("root", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_PARENT), + new ExpectedQueueWithProperties("root.default", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test1", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test2", + EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE, + STATIC_LEAF)); } @Test @@ -152,8 +208,101 @@ public void testSchedulerResponseWeightMode() initResourceManager(config); JSONObject json = sendRequestToSchedulerEndpoint(); - validateSchedulerInfo(json, "weight", "root.default", "root.test1", - "root.test2"); + validateSchedulerInfo(json, "weight", + new ExpectedQueueWithProperties("root", + EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, + STATIC_PARENT), + new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f, + STATIC_LEAF)); + } + + @Test + public void testSchedulerResponseWeightModeWithAutoCreatedQueues() + throws Exception { + Configuration config = CSConfigGenerator + .createWeightConfigWithAutoQueueCreationEnabled(); + config.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + + initResourceManager(config); + initAutoQueueHandler(); + JSONObject json = sendRequestToSchedulerEndpoint(); + validateSchedulerInfo(json, "weight", + new ExpectedQueueWithProperties("root", + EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, + STATIC_PARENT), + new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f, + STATIC_LEAF)); + + //Now create some auto created queues + createQueue("root.auto1"); + createQueue("root.auto2"); + createQueue("root.auto3"); + createQueue("root.autoParent1.auto4"); + + json = sendRequestToSchedulerEndpoint(); + //root.auto1=1w, root.auto2=1w, root.auto3=1w + //root.default=10w, root.test1=4w, root.test2=6w + //root.autoparent1=1w + int sumOfWeights = 24; + ExpectedQueueWithProperties expectedRootQ = + new ExpectedQueueWithProperties("root", + EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, + STATIC_PARENT); + validateSchedulerInfo(json, "weight", + expectedRootQ, + new ExpectedQueueWithProperties("root.auto1", + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights, + AUTO_CREATED_LEAF), + new ExpectedQueueWithProperties("root.auto2", + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights, + AUTO_CREATED_LEAF), + new ExpectedQueueWithProperties("root.auto3", + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights, + AUTO_CREATED_LEAF), + new ExpectedQueueWithProperties("root.autoParent1", + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights, + AUTO_CREATED_PARENT), + new ExpectedQueueWithProperties("root.default", 10.0f, + 10.0f / sumOfWeights, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test1", 4.0f, + 4.0f / sumOfWeights, + STATIC_LEAF), + new ExpectedQueueWithProperties("root.test2", 6.0f, + 6.0f / sumOfWeights, + STATIC_LEAF)); + + validateChildrenOfParent(json, "root.autoParent1", "weight", + expectedRootQ, + new ExpectedQueueWithProperties("root.autoParent1.auto4", + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE, + AUTO_CREATED_LEAF)); + } + + private void initAutoQueueHandler() throws Exception { + CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler(); + autoQueueHandler = new CapacitySchedulerAutoQueueHandler( + cs.getCapacitySchedulerQueueManager(), csConf); + MockNM nm1 = RM.registerNode("h1:1234", 1200 * GB); // label = x + } + + private LeafQueue createQueue(String queuePath) throws YarnException { + return autoQueueHandler.autoCreateQueue( + CSQueueUtils.extractQueuePath(queuePath)); } private JSONObject sendRequestToSchedulerEndpoint() throws Exception { @@ -169,41 +318,129 @@ private JSONObject sendRequestToSchedulerEndpoint() throws Exception { } private void validateSchedulerInfo(JSONObject json, String expectedMode, - String... expectedQueues) throws JSONException { - int expectedQSize = expectedQueues.length; + ExpectedQueueWithProperties rootQueue, + ExpectedQueueWithProperties... expectedQueues) throws JSONException { + Assert.assertNotNull("SchedulerTypeInfo should not be null", json); + assertEquals("incorrect number of elements in: " + json, 1, json.length()); + + JSONObject info = verifySchedulerJSONObject(json); + info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info); + JSONArray queueArray = verifyQueueJSONListObject(info, + expectedQueues.length); + verifyQueues(CapacitySchedulerConfiguration.ROOT, expectedMode, + queueArray, expectedQueues); + } + + private void validateChildrenOfParent(JSONObject json, + String parentPath, String expectedMode, + ExpectedQueueWithProperties rootQueue, + ExpectedQueueWithProperties... expectedLeafQueues) throws JSONException { Assert.assertNotNull("SchedulerTypeInfo should not be null", json); assertEquals("incorrect number of elements in: " + json, 1, json.length()); + JSONObject info = verifySchedulerJSONObject(json); + info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info); + JSONArray queueArray = getQueuesJSONListObject(info); + + Set verifiedQueues = new HashSet<>(); + for (int i = 0; i < queueArray.length(); i++) { + JSONObject childQueueObj = queueArray.getJSONObject(i); + String queuePath = CapacitySchedulerConfiguration.ROOT + "." + + childQueueObj.getString("queueName"); + if (queuePath.equals(parentPath)) { + JSONArray childQueueArray = verifyQueueJSONListObject(childQueueObj, + expectedLeafQueues.length); + verifyQueues(parentPath, expectedMode, childQueueArray, + expectedLeafQueues); + verifiedQueues.add(queuePath); + } + } + + Assert.assertEquals("Not all child queues were found. " + + String.format("Found queues: %s, All queues: %s", verifiedQueues, + Arrays.stream(expectedLeafQueues).map(lq -> lq.path) + .collect(Collectors.toList())), + expectedLeafQueues.length, verifiedQueues.size()); + } + + private JSONObject verifySchedulerJSONObject(JSONObject json) + throws JSONException { JSONObject info = json.getJSONObject("scheduler"); Assert.assertNotNull("Scheduler object should not be null", json); assertEquals("incorrect number of elements in: " + info, 1, info.length()); + return info; + } - //Validate if root queue has the expected mode + private JSONObject verifySchedulerInfoJSONObject(String expectedMode, + ExpectedQueueWithProperties rootQueue, JSONObject info) + throws JSONException { + //Validate if root queue has the expected mode and weight values info = info.getJSONObject("schedulerInfo"); Assert.assertNotNull("SchedulerInfo should not be null", info); - Assert.assertEquals("Expected Queue mode " +expectedMode, expectedMode, + Assert.assertEquals("Expected Queue mode " + expectedMode, expectedMode, info.getString("mode")); + Assert.assertEquals(rootQueue.weight, + Float.parseFloat(info.getString("weight")), DELTA); + Assert.assertEquals(rootQueue.normalizedWeight, + Float.parseFloat(info.getString("normalizedWeight")), DELTA); + return info; + } + private JSONArray verifyQueueJSONListObject(JSONObject info, + int expectedQSize) throws JSONException { + JSONArray queueArray = getQueuesJSONListObject(info); + assertEquals("QueueInfoList should be size of " + expectedQSize, + expectedQSize, queueArray.length()); + return queueArray; + } + + private JSONArray getQueuesJSONListObject(JSONObject info) + throws JSONException { JSONObject queuesObj = info.getJSONObject("queues"); Assert.assertNotNull("QueueInfoList should not be null", queuesObj); JSONArray queueArray = queuesObj.getJSONArray("queue"); Assert.assertNotNull("Queue list should not be null", queueArray); - assertEquals("QueueInfoList should be size of " + expectedQSize, - expectedQSize, queueArray.length()); + return queueArray; + } + + private void verifyQueues(String parentPath, String expectedMode, + JSONArray queueArray, ExpectedQueueWithProperties[] expectedQueues) + throws JSONException { + Map queuesMap = new HashMap<>(); + for (ExpectedQueueWithProperties expectedQueue : expectedQueues) { + queuesMap.put(expectedQueue.path, expectedQueue); + } // Create mapping of queue path -> mode Map modesMap = new HashMap<>(); for (int i = 0; i < queueArray.length(); i++) { JSONObject obj = queueArray.getJSONObject(i); - String queuePath = CapacitySchedulerConfiguration.ROOT + "." + - obj.getString("queueName"); + String queuePath = parentPath + "." + obj.getString("queueName"); String mode = obj.getString("mode"); modesMap.put(queuePath, mode); + + //validate weights of all other queues + ExpectedQueueWithProperties expectedQueue = queuesMap.get(queuePath); + Assert.assertNotNull("Queue not found in expectedQueueMap with path: " + + queuePath, expectedQueue); + Assert.assertEquals("Weight value does not match", + expectedQueue.weight, Float.parseFloat(obj.getString("weight")), + DELTA); + Assert.assertEquals("Normalized weight value does not match for queue " + + queuePath, + expectedQueue.normalizedWeight, + Float.parseFloat(obj.getString("normalizedWeight")), DELTA); + + //validate queue creation type + Assert.assertEquals("Queue creation type does not match for queue " + + queuePath, + expectedQueue.queueType, obj.getString("queueType")); } //Validate queue paths and modes List sortedExpectedPaths = Arrays.stream(expectedQueues) + .map(eq -> eq.path) .sorted(Comparator.comparing(String::toLowerCase)) .collect(Collectors.toList()); @@ -250,6 +487,14 @@ public static Configuration createAbsoluteConfig() { } public static Configuration createWeightConfig() { + return createWeightConfigInternal(false); + } + + public static Configuration createWeightConfigWithAutoQueueCreationEnabled() { + return createWeightConfigInternal(true); + } + + private static Configuration createWeightConfigInternal(boolean enableAqc) { Map conf = new HashMap<>(); conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2"); conf.put("yarn.scheduler.capacity.root.capacity", "1w"); @@ -258,9 +503,16 @@ public static Configuration createWeightConfig() { conf.put("yarn.scheduler.capacity.root.test2.capacity", "6w"); conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); + + if (enableAqc) { + conf.put("yarn.scheduler.capacity.root.auto-queue-creation-v2.enabled", + "true"); + conf.put("yarn.scheduler.capacity.root.default." + + "auto-queue-creation-v2.enabled", "true"); + } return createConfiguration(conf); } - + public static Configuration createConfiguration( Map configs) { Configuration config = new Configuration();