diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index faaddd5..edd52fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -151,8 +151,11 @@ public static Resource toResource(ProfileCapability capability, Preconditions.checkArgument(resourceProfilesMap != null, "Resource profiles map cannot be null"); Resource resource = Resource.newInstance(0, 0); - - if (resourceProfilesMap.containsKey(capability.getProfileName())) { + String tmp = capability.getProfileName(); + if (tmp.isEmpty()) { + tmp = DEFAULT_PROFILE; + } + if (resourceProfilesMap.containsKey(tmp)) { resource = Resource .newInstance(resourceProfilesMap.get(capability.getProfileName())); } @@ -160,7 +163,7 @@ public static Resource toResource(ProfileCapability capability, if(capability.getProfileCapabilityOverride()!= null) { for (Map.Entry entry : capability .getProfileCapabilityOverride().getResources().entrySet()) { - if (entry.getValue() != null && entry.getValue().getValue() != 0) { + if (entry.getValue() != null && entry.getValue().getValue() >= 0) { resource.setResourceInformation(entry.getKey(), entry.getValue()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java new file mode 100644 index 0000000..558e075 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +/** + * This exception is thrown when the client requests information about + * ResourceProfiles in the + * {@link org.apache.hadoop.yarn.api.ApplicationClientProtocol} but resource + * profiles is not enabled on the RM. + * + */ +public class ResourceProfilesNotEnabledException extends YarnException { + + private static final long serialVersionUID = 13498237L; + + public ResourceProfilesNotEnabledException(Throwable cause) { + super(cause); + } + + public ResourceProfilesNotEnabledException(String message) { + super(message); + } + + public ResourceProfilesNotEnabledException(String message, Throwable cause) { + super(message, cause); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 17dae6b..62b3c67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; @@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -226,12 +228,18 @@ @VisibleForTesting protected int numTotalContainers = 1; // Memory to request for the container on which the shell command will run - private long containerMemory = 10; + private static long DEFAULT_CONTAINER_MEMORY = 10; + private long containerMemory = DEFAULT_CONTAINER_MEMORY; // VirtualCores to request for the container on which the shell command will run - private int containerVirtualCores = 1; + private static int DEFAULT_CONTAINER_VCORES = 1; + private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; // Priority of the request private int requestPriority; + // Resource profile for the container + private String containerResourceProfile; + Map resourceProfiles; + // Counter for completed containers ( complete denotes successful or failed ) private AtomicInteger numCompletedContainers = new AtomicInteger(); // Allocated container count so that we know how many containers has the RM @@ -387,6 +395,8 @@ public boolean init(String[] args) throws ParseException, IOException { "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resource_profile", true, + "Resource profile to be requested to run the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); @@ -528,9 +538,11 @@ public boolean init(String[] args) throws ParseException, IOException { } containerMemory = Integer.parseInt(cliParser.getOptionValue( - "container_memory", "10")); + "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( - "container_vcores", "1")); + "container_vcores", "-1")); + containerResourceProfile = + cliParser.getOptionValue("container_resource_profile", ""); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( "num_containers", "1")); if (numTotalContainers == 0) { @@ -647,6 +659,7 @@ public void run() throws YarnException, IOException, InterruptedException { RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); + resourceProfiles = response.getResourceProfiles(); // Dump out information about cluster capability as seen by the // resource manager long maxMem = response.getMaximumResourceCapability().getMemorySize(); @@ -1185,12 +1198,8 @@ private ContainerRequest setupContainerAskForRM() { Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements - // For now, memory and CPU are supported so we set memory and cpu requirements - Resource capability = Resource.newInstance(containerMemory, - containerVirtualCores); - - ContainerRequest request = new ContainerRequest(capability, null, null, - pri); + ContainerRequest request = + new ContainerRequest(createProfileCapability(), null, null, pri); LOG.info("Requested container ask: " + request.toString()); return request; } @@ -1451,4 +1460,36 @@ public TimelinePutResponse run() throws Exception { } } + private ProfileCapability createProfileCapability() + throws YarnRuntimeException { + if (containerMemory < -1 || containerMemory == 0) { + throw new YarnRuntimeException("Value of AM memory '" + containerMemory + + "' has to be greater than 0"); + } + if (containerVirtualCores < -1 || containerVirtualCores == 0) { + throw new YarnRuntimeException( + "Value of AM vcores '" + containerVirtualCores + + "' has to be greater than 0"); + } + + Resource resourceCapability = + Resource.newInstance(containerMemory, containerVirtualCores); + if (resourceProfiles == null) { + containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY : + containerVirtualCores; + containerVirtualCores = + containerVirtualCores == -1 ? DEFAULT_CONTAINER_VCORES : + containerVirtualCores; + resourceCapability.setMemorySize(containerMemory); + resourceCapability.setVirtualCores(containerVirtualCores); + } + + String tmp = containerResourceProfile; + if (containerResourceProfile.isEmpty() && resourceProfiles != null) { + tmp = "default"; + } + ProfileCapability capability = + ProfileCapability.newInstance(tmp, resourceCapability); + return capability; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index eedb501..0370b874 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -66,10 +66,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -79,8 +81,9 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** @@ -119,6 +122,11 @@ public class Client { private static final Log LOG = LogFactory.getLog(Client.class); + + private static final int DEFAULT_AM_MEMORY = 100; + private static final int DEFAULT_AM_VCORES = 1; + private static final int DEFAULT_CONTAINER_MEMORY = 10; + private static final int DEFAULT_CONTAINER_VCORES = 1; // Configuration private Configuration conf; @@ -130,9 +138,12 @@ // Queue for App master private String amQueue = ""; // Amt. of memory resource to request for to run the App Master - private long amMemory = 100; + private long amMemory = DEFAULT_AM_MEMORY; // Amt. of virtual core resource to request for to run the App Master - private int amVCores = 1; + private int amVCores = DEFAULT_AM_VCORES; + + // AM resource profile + private String amResourceProfile = ""; // Application master jar file private String appMasterJar = ""; @@ -151,9 +162,11 @@ private int shellCmdPriority = 0; // Amt of memory to request for container in which shell script will be executed - private int containerMemory = 10; + private long containerMemory = DEFAULT_CONTAINER_MEMORY; // Amt. of virtual cores to request for container in which shell script will be executed - private int containerVirtualCores = 1; + private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // container resource profile + private String containerResourceProfile = ""; // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; @@ -256,6 +269,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); opts.addOption("jar", true, "Jar file containing the application master"); + opts.addOption("master_resource_profile", true, "Resource profile for the application master"); opts.addOption("shell_command", true, "Shell command to be executed by " + "the Application Master. Can only specify either --shell_command " + "or --shell_script"); @@ -269,6 +283,7 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("keep_containers_across_application_attempts", false, @@ -372,17 +387,9 @@ public boolean init(String[] args) throws ParseException { appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); - amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100")); - amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); - - if (amMemory < 0) { - throw new IllegalArgumentException("Invalid memory specified for application master, exiting." - + " Specified memory=" + amMemory); - } - if (amVCores < 0) { - throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." - + " Specified virtual cores=" + amVCores); - } + amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); + amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); + amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); if (!cliParser.hasOption("jar")) { throw new IllegalArgumentException("No jar file specified for application master"); @@ -423,17 +430,14 @@ public boolean init(String[] args) throws ParseException { } shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); - containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); - containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); + containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); + containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); + containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - - if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { - throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," - + " exiting." - + " Specified containerMemory=" + containerMemory - + ", containerVirtualCores=" + containerVirtualCores - + ", numContainer=" + numContainers); + if (numContainers < 1) { + throw new IllegalArgumentException("Invalid no. of containers specified," + + " exiting. Specified numContainer=" + numContainers); } nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); @@ -540,6 +544,32 @@ public boolean run() throws IOException, YarnException { prepareTimelineDomain(); } + Map profiles; + try { + profiles = yarnClient.getResourceProfiles(); + } catch (ResourceProfilesNotEnabledException re) { + profiles = null; + } + + List appProfiles = new ArrayList<>(2); + appProfiles.add(amResourceProfile); + appProfiles.add(containerResourceProfile); + for (String profile : appProfiles) { + if (profile != null && !profile.isEmpty()) { + if (profiles == null) { + String message = "Resource profiles is not enabled"; + LOG.error(message); + throw new IOException(message); + } + if (!profiles.containsKey(profile)) { + String message = "Unknown resource profile '" + profile + + "'. Valid resource profiles are " + profiles.keySet(); + LOG.error(message); + throw new IOException(message); + } + } + } + // Get a new application id YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); @@ -573,6 +603,13 @@ public boolean run() throws IOException, YarnException { ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); + // Set up resource type requirements + // For now, both memory and vcores are supported, so we set memory and + // vcores requirements + setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile, + amPriority, profiles); + setContainerResources(containerMemory, containerVirtualCores, profiles); + appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); @@ -696,8 +733,16 @@ public boolean run() throws IOException, YarnException { // Set class name vargs.add(appMasterMainClass); // Set params for Application Master - vargs.add("--container_memory " + String.valueOf(containerMemory)); - vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); + if (containerMemory > 0) { + vargs.add("--container_memory " + String.valueOf(containerMemory)); + } + if (containerVirtualCores > 0) { + vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); + } + if (containerResourceProfile != null && !containerResourceProfile + .isEmpty()) { + vargs.add("--container_resource_profile " + containerResourceProfile); + } vargs.add("--num_containers " + String.valueOf(numContainers)); if (null != nodeLabelExpression) { appContext.setNodeLabelExpression(nodeLabelExpression); @@ -730,12 +775,6 @@ public boolean run() throws IOException, YarnException { ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); - // Set up resource type requirements - // For now, both memory and vcores are supported, so we set memory and - // vcores requirements - Resource capability = Resource.newInstance(amMemory, amVCores); - appContext.setResource(capability); - // Service data is a binary blob that can be passed to the application // Not needed in this scenario // amContainer.setServiceData(serviceData); @@ -933,4 +972,63 @@ private void prepareTimelineDomain() { timelineClient.stop(); } } + + private void setAMResourceCapability(ApplicationSubmissionContext appContext, + long memory, int vcores, String profile, int priority, + Map profiles) throws IOException { + if (memory < -1 || memory == 0) { + throw new IOException( + "Value of AM memory '" + memory + "' has to be greater than 0"); + } + if (vcores < -1 || vcores == 0) { + throw new IOException( + "Value of AM vcores '" + vcores + "' has to be greater than 0"); + } + String tmp = profile; + if (profile.isEmpty()) { + tmp = "default"; + } + if (appContext.getAMContainerResourceRequest() == null) { + appContext.setAMContainerResourceRequest(ResourceRequest + .newInstance(Priority.newInstance(priority), "*", + Resources.clone(Resources.none()), 1)); + } + + if (appContext.getAMContainerResourceRequest().getProfileCapability() + == null) { + appContext.getAMContainerResourceRequest().setProfileCapability( + ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); + } + Resource capability = Resource.newInstance(0, 0); + // set amMemory because it's used to set Xmx param + if (profiles == null) { + amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; + amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores; + capability.setMemorySize(amMemory); + capability.setVirtualCores(amVCores); + } else { + amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory; + amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores; + capability.setMemorySize(memory); + capability.setVirtualCores(vcores); + } + appContext.getAMContainerResourceRequest().getProfileCapability() + .setProfileCapabilityOverride(capability); + } + + private void setContainerResources(long memory, int vcores, + Map profiles) throws IOException { + if (memory < -1 || memory == 0) { + throw new IOException( + "Container memory '" + memory + "' has to be greated than 0"); + } + if (vcores < -1 || vcores == 0) { + throw new IOException( + "Container vcores '" + vcores + "' has to be greated than 0"); + } + if (profiles == null) { + containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory; + containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 300ea67..790b6cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -1326,4 +1326,32 @@ private int verifyContainerLog(int containerNum, } return numOfWords; } + + @Test + public void testDistributedShellResourceProfiles() throws Exception { + String[][] args = { + { "--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", + "maximum" }, + { "--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", + "default" }, + { "--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", + "default", "--container_resource_profile", "maximum" } + }; + + for (int i = 0; i < args.length; ++i) { + LOG.info("Initializing DS Client"); + Client client = new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args[i])); + LOG.info("Running DS Client"); + try { + client.run(); + Assert.fail("Client run should throw error"); + } catch (Exception e) { + continue; + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 85d77bd..b23b579 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -115,6 +115,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; +import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -912,6 +913,7 @@ public void signalToContainer(ContainerId containerId, @Override public Resource getResourceProfile(String profile) throws YarnException, IOException { + GetResourceProfileRequest request = GetResourceProfileRequest.newInstance(profile); return rmClient.getResourceProfile(request).getResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 7ef4d47..7058717 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.util.resource; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; @@ -52,6 +54,8 @@ public class DominantResourceCalculator extends ResourceCalculator { private Set resourceNames; + private static final Log LOG = LogFactory.getLog(DominantResourceCalculator.class); + public DominantResourceCalculator() { resourceNames = ResourceUtils.getResourceTypes().keySet(); @@ -262,6 +266,7 @@ public Resource divideAndCeil(Resource numerator, long denominator) { public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { try { ResourceInformation rResourceInformation = @@ -292,6 +297,8 @@ public Resource normalize(Resource r, Resource minimumResource, if (stepFactorValue != 0) { value = roundUp(value, stepFactorValue); } + LOG.debug("for resource " + resource + "; rvalue = " + rValue + "; min = " + minimumValue + "; value = " + value + "; maximumValue = " + maximumValue); + tmp.setValue(Math.min(value, maximumValue)); ret.setResourceInformation(resource, tmp); } catch (YarnException ye) { @@ -299,6 +306,8 @@ public Resource normalize(Resource r, Resource minimumResource, "Error getting resource information for " + resource, ye); } } + LOG.debug("normalized " + r + " to " + ret); + LOG.debug("minimumResource: " + minimumResource + "; maximumResource: " + maximumResource); return ret; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c5ffee6..cc2a489 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; +import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -1838,7 +1839,8 @@ public GetResourceProfileResponse getResourceProfile( .getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED); if (!resourceProfilesEnabled) { - throw new YarnException("Resource profiles are not enabled"); + throw new ResourceProfilesNotEnabledException( + "Resource profiles are not enabled"); } return resourceProfilesManager.getResourceProfiles(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index e566243..490f009 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -354,6 +354,7 @@ private RMAppImpl createAndPopulateNewRMApp( ApplicationId applicationId = submissionContext.getApplicationId(); ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); + LOG.debug("amReq = " + amReq); // Verify and get the update application priority and set back to // submissionContext @@ -454,6 +455,7 @@ private ResourceRequest validateAndCreateResourceRequest( SchedulerUtils.normalizeAndValidateRequest(amReq, scheduler.getMaximumResourceCapability(), submissionContext.getQueue(), scheduler, isRecovery, rmContext); + LOG.debug("normalizeandvalidate " + amReq); } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); @@ -465,6 +467,7 @@ private ResourceRequest validateAndCreateResourceRequest( scheduler.getMinimumResourceCapability(), scheduler.getMaximumResourceCapability(), scheduler.getMinimumResourceCapability()); + LOG.debug("normalize " + amReq); return amReq; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java index 15479e0..8839bf9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java @@ -49,9 +49,9 @@ private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); private static final String VCORES = ResourceInformation.VCORES.getName(); - private static final String DEFAULT_PROFILE = "default"; - private static final String MINIMUM_PROFILE = "minimum"; - private static final String MAXIMUM_PROFILE = "maximum"; + public static final String DEFAULT_PROFILE = "default"; + public static final String MINIMUM_PROFILE = "minimum"; + public static final String MAXIMUM_PROFILE = "maximum"; private static final String[] MANDATORY_PROFILES = { DEFAULT_PROFILE, MINIMUM_PROFILE, MAXIMUM_PROFILE }; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 80811b1..995b674 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -417,6 +417,7 @@ public boolean updateResourceRequests(List requests, // Update asks asks.put(resourceName, request); + LOG.debug("added request " + request); if (resourceName.equals(ResourceRequest.ANY)) { //update the applications requested labels set diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 769a9e6..62a4d3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -219,9 +219,15 @@ public Resource getMaxAllowedAllocation() { return configuredMaxAllocation; } - return Resources.createResource( - Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory), - Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)); + Resource ret = Resources.clone(configuredMaxAllocation); + if(ret.getMemorySize() > maxNodeMemory) { + ret.setMemorySize(maxNodeMemory); + } + if(ret.getVirtualCores() > maxNodeVCores) { + ret.setVirtualCores(maxNodeVCores); + } + + return ret; } finally { readLock.unlock(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4c13a67..bb2c28d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1122,6 +1122,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, getClusterResource(), getMinimumResourceCapability(), getMaximumResourceCapability()); + for(ResourceRequest req: ask) { + LOG.debug("capacity allocate req " + req); + } + Allocation allocation; // make sure we aren't stopping/removing the application