diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 91dbc00..7937a66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -239,6 +240,8 @@ // VirtualCores to request for the container on which the shell command will run private static final int DEFAULT_CONTAINER_VCORES = 1; private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // All other resources to request for the container on which the shell command will run + private Map containerResources; // Priority of the request private int requestPriority; @@ -416,6 +419,10 @@ public boolean init(String[] args) throws ParseException, IOException { "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resources", true, + "Amount of resources to be requested to run the shell command. " + + "Specified as resource type=value pairs. " + + "E.g. -container_resources memory-mb=512 vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile to be requested to run the shell command"); opts.addOption("num_containers", true, @@ -562,6 +569,14 @@ public boolean init(String[] args) throws ParseException, IOException { "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( "container_vcores", "-1")); + if (cliParser.hasOption("container_resources")) { + for (String resource : cliParser.getOptionValues("container_resources")) { + String[] splits = resource.trim().split("="); + String key = splits[0]; + long value = Long.parseLong(splits[1]); + containerResources.put(key, value); + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numTotalContainers = Integer.parseInt(cliParser.getOptionValue( @@ -1528,6 +1543,9 @@ private ProfileCapability createProfileCapability() containerVirtualCores; resourceCapability.setMemorySize(containerMemory); resourceCapability.setVirtualCores(containerVirtualCores); + for (Map.Entry entry : containerResources.entrySet()) { + resourceCapability.setResourceValue(entry.getKey(), entry.getValue()); + } } String profileName = containerResourceProfile; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0582afe..cba8e93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.Vector; +import com.google.common.base.Joiner; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -69,7 +70,9 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -79,6 +82,7 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -142,7 +146,8 @@ private long amMemory = DEFAULT_AM_MEMORY; // Amt. of virtual core resource to request for to run the App Master private int amVCores = DEFAULT_AM_VCORES; - + // Amount of resources to request to run the App Master + private Map amResources = new HashMap<>(); // AM resource profile private String amResourceProfile = ""; @@ -166,6 +171,8 @@ private long containerMemory = DEFAULT_CONTAINER_MEMORY; // Amt. of virtual cores to request for container in which shell script will be executed private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; + // Amount of resources to request for container in which shell script will be executed + private Map containerResources = new HashMap<>(); // container resource profile private String containerResourceProfile = ""; // No. of containers in which the shell script needs to be executed @@ -269,6 +276,9 @@ public Client(Configuration conf) throws Exception { opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); + opts.addOption("master_resources", true, "Amount of resources to be requested to run the application master. " + + "Specified as resource type=value pairs. " + + "E.g. -master_resources memory-mb=512 vcores=2"); opts.addOption("jar", true, "Jar file containing the application master"); opts.addOption("master_resource_profile", true, "Resource profile for the application master"); opts.addOption("shell_command", true, "Shell command to be executed by " + @@ -284,6 +294,9 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("container_resources", true, "Amount of resources to be requested to run the shell command. " + + "Specified as resource type=value pairs. " + + "E.g. -container_resources memory-mb=512 vcores=1"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("log_properties", true, "log4j.properties file"); @@ -392,6 +405,26 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("master_memory", "-1")); amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); + if (cliParser.hasOption("master_resources")) { + for (String resource : cliParser.getOptionValues("master_resources")) { + resource = resource.trim(); + if (!resource.matches("^[^=]+=\\d+$")) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs."); + } + String[] splits = resource.split("="); + String key = splits[0]; + long value = Long.parseLong(splits[1]); + if (key.equals(ResourceInformation.MEMORY_URI)) { + amMemory = value; + } else if (key.equals(ResourceInformation.VCORES_URI)) { + amVCores = (int)value; + } else { + amResources.put(key, value); + } + } + } amResourceProfile = cliParser.getOptionValue("master_resource_profile", ""); if (!cliParser.hasOption("jar")) { @@ -437,6 +470,26 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); + if (cliParser.hasOption("container_resources")) { + for (String resource : cliParser.getOptionValues("container_resources")) { + resource = resource.trim(); + if (!resource.matches("^[^=]+=\\d+$")) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs."); + } + String[] splits = resource.split("="); + String key = splits[0]; + long value = Long.parseLong(splits[1]); + if (key.equals(ResourceInformation.MEMORY_URI)) { + containerMemory = value; + } else if (key.equals(ResourceInformation.VCORES_URI)) { + containerVirtualCores = (int)value; + } else { + containerResources.put(key, value); + } + } + } containerResourceProfile = cliParser.getOptionValue("container_resource_profile", ""); numContainers = @@ -613,9 +666,9 @@ public boolean run() throws IOException, YarnException { // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile, - amPriority, profiles); - setContainerResources(containerMemory, containerVirtualCores, profiles); + List resourceTypes = yarnClient.getResourceTypeInfo(); + setAMResourceCapability(appContext, profiles, resourceTypes); + setContainerResources(profiles, resourceTypes); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); @@ -746,6 +799,10 @@ public boolean run() throws IOException, YarnException { if (containerVirtualCores > 0) { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); } + if (!containerResources.isEmpty()) { + Joiner.MapJoiner joiner = Joiner.on(' ').withKeyValueSeparator("="); + vargs.add("--container_resources " + joiner.join(containerResources)); + } if (containerResourceProfile != null && !containerResourceProfile .isEmpty()) { vargs.add("--container_resource_profile " + containerResourceProfile); @@ -981,25 +1038,25 @@ private void prepareTimelineDomain() { } private void setAMResourceCapability(ApplicationSubmissionContext appContext, - long memory, int vcores, String profile, int priority, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { + Map profiles, List resourceTypes) + throws IllegalArgumentException, IOException, YarnException { + if (amMemory < -1 || amMemory == 0) { throw new IllegalArgumentException("Invalid memory specified for" - + " application master, exiting. Specified memory=" + memory); + + " application master, exiting. Specified memory=" + amMemory); } - if (vcores < -1 || vcores == 0) { + if (amVCores < -1 || amVCores == 0) { throw new IllegalArgumentException("Invalid virtual cores specified for" - + " application master, exiting. Specified virtual cores=" + vcores); + + " application master, exiting. Specified virtual cores=" + amVCores); } - String tmp = profile; - if (profile.isEmpty()) { + String tmp = amResourceProfile; + if (amResourceProfile.isEmpty()) { tmp = "default"; } if (appContext.getAMContainerResourceRequests() == null) { List amResourceRequests = new ArrayList(); amResourceRequests - .add(ResourceRequest.newInstance(Priority.newInstance(priority), "*", - Resources.clone(Resources.none()), 1)); + .add(ResourceRequest.newInstance(Priority.newInstance(amPriority), + "*", Resources.clone(Resources.none()), 1)); appContext.setAMContainerResourceRequests(amResourceRequests); } @@ -1008,36 +1065,55 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.getAMContainerResourceRequests().get(0).setProfileCapability( ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); } + Resource capability = Resource.newInstance(0, 0); + + validateResourceTypes(amResources.keySet(), resourceTypes); + for (Map.Entry entry : amResources.entrySet()) { + capability.setResourceValue(entry.getKey(), entry.getValue()); + } // set amMemory because it's used to set Xmx param - if (profiles == null) { - amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; - amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores; - capability.setMemorySize(amMemory); - capability.setVirtualCores(amVCores); - } else { - amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory; - amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores; - capability.setMemorySize(memory); - capability.setVirtualCores(vcores); + if (amMemory == -1) { + amMemory = (profiles == null) ? DEFAULT_AM_MEMORY : + profiles.get(tmp).getMemorySize(); + } + if (amVCores == -1) { + amVCores = (profiles == null) ? DEFAULT_AM_VCORES : + profiles.get(tmp).getVirtualCores(); } + capability.setMemorySize(amMemory); + capability.setVirtualCores(amVCores); appContext.getAMContainerResourceRequests().get(0).getProfileCapability() .setProfileCapabilityOverride(capability); } - private void setContainerResources(long memory, int vcores, - Map profiles) throws IllegalArgumentException { - if (memory < -1 || memory == 0) { - throw new IllegalArgumentException( - "Container memory '" + memory + "' has to be greated than 0"); + private void setContainerResources(Map profiles, + List resourceTypes) throws IllegalArgumentException { + if (containerMemory < -1 || containerMemory == 0) { + throw new IllegalArgumentException("Container memory '" + + containerMemory + "' has to be greated than 0"); } - if (vcores < -1 || vcores == 0) { - throw new IllegalArgumentException( - "Container vcores '" + vcores + "' has to be greated than 0"); + if (containerVirtualCores < -1 || containerVirtualCores == 0) { + throw new IllegalArgumentException("Container vcores '" + + containerVirtualCores + "' has to be greated than 0"); } + validateResourceTypes(containerResources.keySet(), resourceTypes); if (profiles == null) { - containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory; - containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores; + containerMemory = containerMemory == -1 ? + DEFAULT_CONTAINER_MEMORY : containerMemory; + containerVirtualCores = containerVirtualCores == -1 ? + DEFAULT_CONTAINER_VCORES : containerVirtualCores; + } + } + + private void validateResourceTypes(Iterable resourceNames, + List resourceTypes) { + for (String resourceName : resourceNames) { + if (!resourceTypes.stream().anyMatch(e -> + e.getName().equals(resourceName))) { + throw new ResourceNotFoundException("Unknown resource: " + + resourceName); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 5cf884b..8032a1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -54,11 +54,15 @@ import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -71,6 +75,7 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.client.api.impl.TimelineWriter; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; @@ -94,6 +99,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; +import org.mockito.internal.matchers.Null; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1377,4 +1383,109 @@ public void testDistributedShellResourceProfiles() throws Exception { } } } + + @Test + public void testDistributedShellAMResources() throws Exception { + final long MASTER_MEMORY = 512; + final int MASTER_VCORES = 1; + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + String.valueOf(MASTER_MEMORY), + "--master_resources", + "vcores=" + String.valueOf(MASTER_VCORES), + "--container_resources", + "memory-mb=256", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(yarnCluster.getConfig())); + yarnClient.start(); + + while (true) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0 ) { + Thread.sleep(10); + continue; + } + ApplicationReport appReport = apps.get(0); + ApplicationId appId = appReport.getApplicationId(); + List appAttempts = + yarnClient.getApplicationAttempts(appId); + if (appAttempts.isEmpty()) { + Thread.sleep(10); + continue; + } + ApplicationAttemptReport appAttemptReport = appAttempts.get(0); + ContainerId amContainerId = appAttemptReport.getAMContainerId(); + if (amContainerId == null) { + Thread.sleep(10); + continue; + } + ContainerReport report = yarnClient.getContainerReport(amContainerId); + Resource resource = report.getAllocatedResource(); + Assert.assertEquals(resource.getMemorySize(), MASTER_MEMORY); + Assert.assertEquals(resource.getVirtualCores(), MASTER_VCORES); + return; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testDistributedShellAMResourcesWithIllegalArguments() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "memory-mb=invalid" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + } + + @Test(expected = ResourceNotFoundException.class) + public void testDistributedShellAMResourcesWithUnknownResource() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_resources", + "unknown-resource=5" + }; + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + client.run(); + } }