diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 91dbc001c5d..926de50ceb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -90,6 +91,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -241,6 +244,9 @@ private int containerVirtualCores = DEFAULT_CONTAINER_VCORES; // Priority of the request private int requestPriority; + // Execution type of the containers. + // Default GUARANTEED. + private ExecutionType containerType = ExecutionType.GUARANTEED; // Resource profile for the container private String containerResourceProfile = ""; @@ -412,6 +418,8 @@ public boolean init(String[] args) throws ParseException, IOException { "App Attempt ID. Not to be used unless for testing purposes"); opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("container_type", true, + "Container execution type, GUARANTEED or OPPORTUNISTIC"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, @@ -558,6 +566,16 @@ public boolean init(String[] args) throws ParseException, IOException { domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); } + if (cliParser.hasOption("container_type")) { + String containerTypeStr = cliParser.getOptionValue("container_type"); + if (Arrays.stream(ExecutionType.values()).noneMatch( + executionType -> executionType.toString() + .equals(containerTypeStr))) { + throw new IllegalArgumentException("Invalid container_type: " + + containerTypeStr); + } + containerType = ExecutionType.valueOf(containerTypeStr); + } containerMemory = Integer.parseInt(cliParser.getOptionValue( "container_memory", "-1")); containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( @@ -1242,7 +1260,9 @@ private ContainerRequest setupContainerAskForRM() { // Set up resource type requirements ContainerRequest request = - new ContainerRequest(createProfileCapability(), null, null, pri); + new ContainerRequest(createProfileCapability(), null, null, + pri, 0, true, null, + ExecutionTypeRequest.newInstance(containerType)); LOG.info("Requested container ask: " + request.toString()); return request; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0582afec261..16bf0fd82e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.Vector; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; @@ -171,6 +173,8 @@ // No. of containers in which the shell script needs to be executed private int numContainers = 1; private String nodeLabelExpression = null; + // Container type, default GUARANTEED. + private ExecutionType containerType = ExecutionType.GUARANTEED; // log4j.properties file // if available, add to local resources and set into classpath @@ -282,6 +286,8 @@ public Client(Configuration conf) throws Exception { opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); + opts.addOption("container_type", true, + "Container execution type, GUARANTEED or OPPORTUNISTIC"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); opts.addOption("container_resource_profile", true, "Resource profile for the shell command"); @@ -433,6 +439,16 @@ public boolean init(String[] args) throws ParseException { } shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); + if (cliParser.hasOption("container_type")) { + String containerTypeStr = cliParser.getOptionValue("container_type"); + if (Arrays.stream(ExecutionType.values()).noneMatch( + executionType -> executionType.toString() + .equals(containerTypeStr))) { + throw new IllegalArgumentException("Invalid container_type: " + + containerTypeStr); + } + containerType = ExecutionType.valueOf(containerTypeStr); + } containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "-1")); containerVirtualCores = @@ -740,6 +756,9 @@ public boolean run() throws IOException, YarnException { // Set class name vargs.add(appMasterMainClass); // Set params for Application Master + if (containerType != null) { + vargs.add("--container_type " + String.valueOf(containerType)); + } if (containerMemory > 0) { vargs.add("--container_memory " + String.valueOf(containerMemory)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 5cf884b6470..d6bb8d6444a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -1179,6 +1179,33 @@ public void testDSShellWithInvalidArgs() throws Exception { e.getMessage().contains("No shell command or shell script specified " + "to be executed by application master")); } + + LOG.info("Initializing DS Client with invalid container_type argument"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "UNSUPPORTED_TYPE" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE")); + } } @Test @@ -1377,4 +1404,33 @@ public void testDistributedShellResourceProfiles() throws Exception { } } } + + @Test + public void testDSShellWithOpportunisticContainers() throws Exception { + Client client = new Client(new Configuration(yarnCluster.getConfig())); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_command", + "date", + "--container_type", + "OPPORTUNISTIC" + }; + client.init(args); + client.run(); + } catch (Exception e) { + Assert.fail("Job execution with opportunistic containers failed."); + } + } }