diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 926de50ceb1..b2692b96a81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -242,6 +243,9 @@ // VirtualCores to request for the container on which the shell command will run private static final int DEFAULT_CONTAINER_VCORES = 1; private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // All other resources to request for the container + // on which the shell command will run + private Map containerResources = new HashMap<>(); // Priority of the request private int requestPriority; // Execution type of the containers. @@ -424,6 +428,10 @@ public boolean init(String[] args) throws ParseException, IOException { "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resources", true, + "Amount of resources to be requested to run the shell command. " + + "Specified as resource type=value pairs separated by commas. " + + "E.g. -container_resources memory-mb=512,vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile to be requested to run the shell command"); opts.addOption("num_containers", true, @@ -580,6 +588,14 @@ public boolean init(String[] args) throws ParseException, IOException { "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( "container_vcores", "-1")); + containerResources = new HashMap<>(); + if (cliParser.hasOption("container_resources")) { + Map container_resources = Client.parseResourcesString( + cliParser.getOptionValue("container_resources")); + for (Map.Entry entry : container_resources.entrySet()) { + containerResources.put(entry.getKey(), entry.getValue()); + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( @@ -701,6 +717,7 @@ public void run() throws YarnException, IOException, InterruptedException { .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); resourceProfiles = response.getResourceProfiles(); + ResourceUtils.reinitializeResources(response.getResourceTypes()); // Dump out information about cluster capability as seen by the // resource manager long maxMem = response.getMaximumResourceCapability().getMemorySize(); @@ -1548,6 +1565,9 @@ private ProfileCapability createProfileCapability() containerVirtualCores; resourceCapability.setMemorySize(containerMemory); resourceCapability.setVirtualCores(containerVirtualCores); + for (Map.Entry entry : containerResources.entrySet()) { + resourceCapability.setResourceValue(entry.getKey(), entry.getValue()); + } } String profileName = containerResourceProfile; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 16bf0fd82e5..8dfc62ff3d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -29,6 +29,7 @@ import java.util.Vector; import java.util.Arrays; +import com.google.common.base.Joiner; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -70,7 +71,9 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -81,8 +84,11 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; @@ -144,7 +150,8 @@ private long amMemory = DEFAULT_AM_MEMORY; // Amt. of virtual core resource to request for to run the App Master private int amVCores = DEFAULT_AM_VCORES; - + // Amount of resources to request to run the App Master + private Map amResources = new HashMap<>(); // AM resource profile private String amResourceProfile = ""; @@ -168,6 +175,9 @@ private long containerMemory = DEFAULT_CONTAINER_MEMORY; // Amt. of virtual cores to request for container in which shell script will be executed private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // Amt. of resources to request for container + // in which shell script will be executed + private Map containerResources = new HashMap<>(); // container resource profile private String containerResourceProfile = ""; // No. of containers in which the shell script needs to be executed @@ -263,6 +273,8 @@ public Client(Configuration conf) throws Exception { Client(String appMasterMainClass, Configuration conf) { this.conf = conf; + this.conf.setBoolean( + YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true); this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -272,7 +284,12 @@ public Client(Configuration conf) throws Exception { opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); - opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); + opts.addOption("master_vcores", true, "Amount of virtual cores " + + "to be requested to run the application master"); + opts.addOption("master_resources", true, "Amount of resources " + + "to be requested to run the application master. " + + "Specified as resource type=value pairs separated by commas." + + "E.g. -master_resources memory-mb=512,vcores=2"); opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("master_resource_profile", true, "Resource profile for the application master"); opts.addOption("shell_command", true, "Shell command to be executed by " + @@ -288,8 +305,14 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_type", true, "Container execution type, GUARANTEED or OPPORTUNISTIC"); - opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); - opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_memory", true, "Amount of memory in MB " + + "to be requested to run the shell command"); + opts.addOption("container_vcores", true, "Amount of virtual cores " + + "to be requested to run the shell command"); + opts.addOption("container_resources", true, "Amount of resources " + + "to be requested to run the shell command. " + + "Specified as resource type=value pairs separated by commas. " + + "E.g. -container_resources memory-mb=256,vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); @@ -398,6 +421,19 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); + if (cliParser.hasOption("master_resources")) { + Map master_resources = + parseResourcesString(cliParser.getOptionValue("master_resources")); + for (Map.Entry entry : master_resources.entrySet()) { + if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { + amMemory = entry.getValue(); + } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { + amVCores = entry.getValue().intValue(); + } else { + amResources.put(entry.getKey(), entry.getValue()); + } + } + } amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); if (!cliParser.hasOption("jar")) { @@ -453,6 +489,19 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); + if (cliParser.hasOption("container_resources")) { + Map container_resources = + parseResourcesString(cliParser.getOptionValue("container_resources")); + for (Map.Entry entry : container_resources.entrySet()) { + if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { + containerMemory = entry.getValue(); + } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) { + containerVirtualCores = entry.getValue().intValue(); + } else { + containerResources.put(entry.getKey(), entry.getValue()); + } + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numContainers = @@ -629,9 +678,9 @@ public boolean run() throws IOException, YarnException { // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile, - amPriority, profiles); - setContainerResources(containerMemory, containerVirtualCores, profiles); + List resourceTypes = yarnClient.getResourceTypeInfo(); + setAMResourceCapability(appContext, profiles, resourceTypes); + setContainerResources(profiles, resourceTypes); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); @@ -765,6 +814,10 @@ public boolean run() throws IOException, YarnException { if (containerVirtualCores > 0) { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); } + if (!containerResources.isEmpty()) { + Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("="); + vargs.add("--container_resources " + joiner.join(containerResources)); + } if (containerResourceProfile != null && !containerResourceProfile .isEmpty()) { vargs.add("--container_resource_profile " + containerResourceProfile); @@ -1000,25 +1053,26 @@ private void prepareTimelineDomain() { } private void setAMResourceCapability(ApplicationSubmissionContext appContext, - long memory, int vcores, String profile, int priority, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { + Map profiles, List resourceTypes) + throws IllegalArgumentException, IOException, YarnException { + if (amMemory < -1 || amMemory == 0) { throw new IllegalArgumentException("Invalid memory specified for" - + " application master, exiting. Specified memory=" + memory); + + " application master, exiting. Specified memory=" + amMemory); } - if (vcores < -1 || vcores == 0) { + if (amVCores < -1 || amVCores == 0) { throw new IllegalArgumentException("Invalid virtual cores specified for" - + " application master, exiting. Specified virtual cores=" + vcores); + + " application master, exiting. " + + "Specified virtual cores=" + amVCores); } - String tmp = profile; - if (profile.isEmpty()) { + String tmp = amResourceProfile; + if (amResourceProfile.isEmpty()) { tmp = "default"; } if (appContext.getAMContainerResourceRequests() == null) { List amResourceRequests = new ArrayList(); amResourceRequests - .add(ResourceRequest.newInstance(Priority.newInstance(priority), "*", - Resources.clone(Resources.none()), 1)); + .add(ResourceRequest.newInstance(Priority.newInstance(amPriority), + "*", Resources.clone(Resources.none()), 1)); appContext.setAMContainerResourceRequests(amResourceRequests); } @@ -1027,36 +1081,90 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.getAMContainerResourceRequests().get(0).setProfileCapability( ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); } + Resource capability = Resource.newInstance(0, 0); + + validateResourceTypes(amResources.keySet(), resourceTypes); + for (Map.Entry entry : amResources.entrySet()) { + capability.setResourceValue(entry.getKey(), entry.getValue()); + } // set amMemory because it's used to set Xmx param - if (profiles == null) { - amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; - amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores; - capability.setMemorySize(amMemory); - capability.setVirtualCores(amVCores); - } else { - amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory; - amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores; - capability.setMemorySize(memory); - capability.setVirtualCores(vcores); + if (amMemory == -1) { + amMemory = (profiles == null) ? DEFAULT_AM_MEMORY : + profiles.get(tmp).getMemorySize(); + } + if (amVCores == -1) { + amVCores = (profiles == null) ? DEFAULT_AM_VCORES : + profiles.get(tmp).getVirtualCores(); } + capability.setMemorySize(amMemory); + capability.setVirtualCores(amVCores); appContext.getAMContainerResourceRequests().get(0).getProfileCapability() .setProfileCapabilityOverride(capability); } - private void setContainerResources(long memory, int vcores, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { - throw new IllegalArgumentException( - "Container memory '" + memory + "' has to be greated than 0"); + private void setContainerResources(Map profiles, + List resourceTypes) throws IllegalArgumentException { + if (containerMemory < -1 || containerMemory == 0) { + throw new IllegalArgumentException("Container memory '" + + containerMemory + "' has to be greated than 0"); } - if (vcores < -1 || vcores == 0) { - throw new IllegalArgumentException( - "Container vcores '" + vcores + "' has to be greated than 0"); + if (containerVirtualCores < -1 || containerVirtualCores == 0) { + throw new IllegalArgumentException("Container vcores '" + + containerVirtualCores + "' has to be greated than 0"); } + validateResourceTypes(containerResources.keySet(), resourceTypes); if (profiles == null) { - containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory; - containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores; + containerMemory = containerMemory == -1 ? + DEFAULT_CONTAINER_MEMORY : containerMemory; + containerVirtualCores = containerVirtualCores == -1 ? + DEFAULT_CONTAINER_VCORES : containerVirtualCores; + } + } + + private void validateResourceTypes(Iterable resourceNames, + List resourceTypes) { + for (String resourceName : resourceNames) { + if (!resourceTypes.stream().anyMatch(e -> + e.getName().equals(resourceName))) { + throw new ResourceNotFoundException("Unknown resource: " + + resourceName); + } + } + } + + static Map parseResourcesString(String resourcesStr) { + Map resources = new HashMap<>(); + + // Ignore the grouping "[]" + if (resourcesStr.startsWith("[")) { + resourcesStr = resourcesStr.substring(1); + } + if (resourcesStr.endsWith("]")) { + resourcesStr = resourcesStr.substring(0, resourcesStr.length()); + } + + for (String resource : resourcesStr.trim().split(",")) { + resource = resource.trim(); + if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String[] splits = resource.split("="); + String key = splits[0], value = splits[1]; + String units = ResourceUtils.getUnits(value); + String valueWithoutUnit = value.substring( + 0, value.length() - units.length()).trim(); + Long resourceValue = Long.valueOf(valueWithoutUnit); + if (!units.isEmpty()) { + resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); + } + if (key.equals("memory")) { + key = ResourceInformation.MEMORY_URI; + } + resources.put(key, resourceValue); } + return resources; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index d6bb8d6444a..36708d72ece 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -54,11 +54,15 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -71,6 +75,7 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; @@ -1433,4 +1438,102 @@ public void testDSShellWithOpportunisticContainers() throws Exception { Assert.fail("Job execution with opportunistic containers failed."); } } + + @Test + public void testDistributedShellAMResources() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory=1 Gi,vcores=1", + "--container_resources", + "memory=256,vcores=1", + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + + while (true) { + List apps = yarnClient.getApplications(); + if (apps.isEmpty()) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + if (appAttempts.isEmpty()) { + Thread.sleep(10); + continue; + } + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + if (amContainerId == null) { + Thread.sleep(10); + continue; + } + ContainerReport report = yarnClient.getContainerReport(amContainerId); + Resource resource = report.getAllocatedResource(); + Assert.assertEquals(1024, resource.getMemorySize()); + Assert.assertEquals(1, resource.getVirtualCores()); + return; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testDistributedShellAMResourcesWithIllegalArguments() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory-mb=invalid" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDistributedShellAMResourcesWithUnknownResource() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "unknown-resource=5" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + client.run(); + } }