diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9e7b0d8f634..0ce67bbb62d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -128,7 +128,7 @@ // either at this level or anywhere in the queue's hierarchy. private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false; - protected enum CapacityConfigType { + public enum CapacityConfigType { // FIXME, from what I can see, Percentage mode can almost apply to weighted // and percentage mode at the same time, there's only small area need to be // changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java index 8de752901fa..aca5ed6f7b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper; import java.util.ArrayList; import java.util.Map; @@ -53,6 +54,7 @@ protected QueueAclsInfo queueAcls; protected int queuePriority; protected String orderingPolicyInfo; + protected String mode; @XmlTransient static final float EPSILON = 1e-8f; @@ -98,6 +100,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) { orderingPolicyInfo = ((ParentQueue) parent).getQueueOrderingPolicy() .getConfigName(); } + mode = CapacitySchedulerInfoHelper.getMode(parent); } public float getCapacity() { @@ -173,4 +176,8 @@ protected CapacitySchedulerQueueInfoList getQueues( } return queuesInfo; } + + public String getMode() { + return mode; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java index 897f94043a1..6fb0290b1d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity. CapacitySchedulerConfiguration.RESOURCE_PATTERN; @@ -86,6 +87,7 @@ protected String orderingPolicyInfo; protected boolean autoCreateChildQueueEnabled; protected LeafQueueTemplateInfo leafQueueTemplate; + protected String mode; CapacitySchedulerQueueInfo() { }; @@ -128,6 +130,8 @@ QueueResourceQuotas qResQuotas = q.getQueueResourceQuotas(); populateQueueCapacities(qCapacities, qResQuotas); + mode = CapacitySchedulerInfoHelper.getMode(q); + ResourceUsage queueResourceUsage = q.getQueueResourceUsage(); populateQueueResourceUsage(queueResourceUsage); @@ -306,4 +310,8 @@ public boolean isAutoCreateChildQueueEnabled() { public LeafQueueTemplateInfo getLeafQueueTemplate() { return leafQueueTemplate; } + + public String getMode() { + return mode; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java new file mode 100644 index 00000000000..6f0a9fc2be8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper; + +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class CapacitySchedulerInfoHelper { + + public static String getMode(CSQueue queue) throws YarnRuntimeException { + if (queue.getCapacityConfigType() == AbstractCSQueue.CapacityConfigType.ABSOLUTE_RESOURCE) { + return "absolute"; + } else if (queue.getCapacityConfigType() == AbstractCSQueue.CapacityConfigType.PERCENTAGE) { + float weight = queue.getQueueCapacities().getWeight(); + if (weight == -1) { + //-1 indicates we are not in weight mode + return "percentage"; + } else { + return "weight"; + } + } + throw new YarnRuntimeException("Unknown mode for queue: " + queue.getQueuePath() + + ". Queue details: " + queue); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java index 1477a336763..745f6c76a09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigGeneratorForTest.java @@ -46,7 +46,42 @@ public static Configuration createBasicCSConfiguration() { conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); conf.put("yarn.scheduler.capacity.queue-mappings", - "u:test1:test1,u:test2:test2"); + "u:test1:root.test1,u:test2:root.test2"); + return createConfiguration(conf); + } + + public static Configuration createBasicCSConfigurationPercentage() { + Map conf = new HashMap<>(); + conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2"); + conf.put("yarn.scheduler.capacity.root.test1.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test2.capacity", "50"); + conf.put("yarn.scheduler.capacity.root.test1.maximum-capacity", "100"); + conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); + return createConfiguration(conf); + } + + public static Configuration createBasicCSConfigurationAbsolute() { + Map conf = new HashMap<>(); + conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2"); + conf.put("yarn.scheduler.capacity.root.capacity", "[memory=6136,vcores=30]"); + conf.put("yarn.scheduler.capacity.root.default.capacity", "[memory=3064,vcores=15]"); + conf.put("yarn.scheduler.capacity.root.test1.capacity", "[memory=2048,vcores=10]"); + conf.put("yarn.scheduler.capacity.root.test2.capacity", "[memory=1024,vcores=5]"); + conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); + return createConfiguration(conf); + } + + public static Configuration createBasicCSConfigurationWeight() { + Map conf = new HashMap<>(); + conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2"); + conf.put("yarn.scheduler.capacity.root.capacity", "1w"); + conf.put("yarn.scheduler.capacity.root.default.capacity", "10w"); + conf.put("yarn.scheduler.capacity.root.test1.capacity", "4w"); + conf.put("yarn.scheduler.capacity.root.test2.capacity", "6w"); + conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); + conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); return createConfiguration(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java index 5785b141144..36e89278441 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java @@ -34,12 +34,15 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -57,6 +60,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -73,6 +77,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.*; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -85,10 +90,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.AdHocLogDumper; import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; @@ -1060,6 +1071,125 @@ public void testValidateAndGetSchedulerConfigurationValidScheduler() .getStatusCode(), response.getStatus()); } + @Test + public void testSchedulerResponsePercentageMode() + throws Exception { + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfigurationPercentage(); + config.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + + RMWebServices webService = schedulerResponseInitScheduler(config); + SchedulerTypeInfo info = webService.getSchedulerInfo(); + validateSchedulerInfo(info, "percentage", "root.default", "root.test1", "root.test2"); + } + + @Test + public void testSchedulerResponseAbsoluteMode() + throws Exception { + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfigurationAbsolute(); + config.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + + RMWebServices webService = schedulerResponseInitScheduler(config); + SchedulerTypeInfo info = webService.getSchedulerInfo(); + validateSchedulerInfo(info, "absolute", "root.default", "root.test1", "root.test2"); + schedulerResponsePrintJson(); + } + + private void schedulerResponsePrintJson() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("cluster").path("scheduler") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + + assertEquals(MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + response.getType().toString()); + } + + @Test + public void testSchedulerResponseWeightMode() + throws Exception { + Configuration config = CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfigurationWeight(); + config.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.MEMORY_CONFIGURATION_STORE); + + RMWebServices webService = schedulerResponseInitScheduler(config); + SchedulerTypeInfo info = webService.getSchedulerInfo(); + validateSchedulerInfo(info, "weight", "root.default", "root.test1", "root.test2"); + schedulerResponsePrintJson(); + } + + private RMWebServices schedulerResponseInitScheduler(Configuration config) throws Exception { + CapacityScheduler scheduler = new CapacityScheduler(); + RMContext rmContext = prepareRMContext(config); + scheduler.setRMContext(rmContext); + scheduler.setConf(config); + scheduler.serviceInit(config); + + //Need to call reinitialize as + //MutableCSConfigurationProvider with InMemoryConfigurationStore + //somehow does not load the queues properly and falls back to default config. + //Therefore CS will think there's only the default queue there. + scheduler.reinitialize(config, rmContext, true); + return prepareWebServiceForValidation(scheduler); + } + + private void validateSchedulerInfo(SchedulerTypeInfo info, String expectedMode, String... expectedQueues) { + int expectedQSize = expectedQueues.length; + + Assert.assertNotNull("SchedulerTypeInfo should not be null", info); + + SchedulerInfo schedulerInfo = info.getSchedulerInfo(); + Assert.assertNotNull("SchedulerInfo should not be null", schedulerInfo); + Assert.assertTrue("SchedulerInfo should be instance of " + CapacitySchedulerInfo.class, schedulerInfo instanceof CapacitySchedulerInfo); + + CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo; + CapacitySchedulerQueueInfoList queues = capacitySchedulerInfo.getQueues(); + Assert.assertNotNull("Queues should not be null", queues); + + //Validate if root has the expected mode + Assert.assertEquals("Expected Queue mode " + expectedMode, expectedMode, capacitySchedulerInfo.getMode()); + + List qInfoList = queues.getQueueInfoList(); + Assert.assertNotNull("QueueInfoList should not be null", qInfoList); + Assert.assertEquals("QueueInfoList should be size of " + expectedQSize, expectedQSize, qInfoList.size()); + + List sortedQueueInfos = qInfoList.stream() + .sorted(Comparator.comparing(CapacitySchedulerQueueInfo::getQueuePath)) + .collect(Collectors.toList()); + + List queuePaths = + sortedQueueInfos.stream().map(CapacitySchedulerQueueInfo::getQueuePath).collect(Collectors.toList()); + Assert.assertEquals("Expected Queue paths: " + Arrays.toString(expectedQueues), Arrays.asList(expectedQueues), queuePaths); + + Map modesMap = + sortedQueueInfos.stream().collect(Collectors.toMap(CapacitySchedulerQueueInfo::getQueuePath, CapacitySchedulerQueueInfo::getMode)); + Set modesSet = new HashSet<>(modesMap.values()); + Assert.assertEquals("Expected a single Queue mode for all queues: " + + expectedMode + ", got: " + modesMap, 1, modesSet.size()); + Assert.assertEquals("Expected Queue mode " + expectedMode, expectedMode, modesSet.iterator().next()); + } + + public static RMContext prepareRMContext(Configuration config) { + RMContext rmContext = mock(RMContext.class); + LocalConfigurationProvider configProvider = mock(LocalConfigurationProvider.class); + when(rmContext.getConfigurationProvider()) + .thenReturn(configProvider); + RMNodeLabelsManager nodeLabelsManager = mock(RMNodeLabelsManager.class); + when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager); + Resource partitionResource = Resources.createResource(1024, 1); + when(nodeLabelsManager + .getResourceByLabel(any(), any())) + .thenReturn(partitionResource); + PlacementManager queuePlacementManager = mock(PlacementManager.class); + when(rmContext.getQueuePlacementManager()) + .thenReturn(queuePlacementManager); + when(rmContext.getYarnConfiguration()).thenReturn(config); + return rmContext; + } + private CapacityScheduler prepareCSForValidation(Configuration config) throws IOException { CapacityScheduler scheduler = mock(CapacityScheduler.class);