diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3644761..66ef33e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -199,8 +199,6 @@ // Shell command to be executed private String shellCommand = ""; - // Args to be passed to the shell command - private String shellArgs = ""; // Env variables to be setup for the shell command private Map shellEnv = new HashMap(); @@ -306,7 +304,6 @@ public boolean init(String[] args) throws ParseException, IOException { "Shell command to be executed by the Application Master"); opts.addOption("shell_script", true, "Location of the shell script to be executed"); - opts.addOption("shell_args", true, "Command line args for the shell script"); opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("container_memory", true, @@ -380,9 +377,6 @@ public boolean init(String[] args) throws ParseException, IOException { } shellCommand = cliParser.getOptionValue("shell_command"); - if (cliParser.hasOption("shell_args")) { - shellArgs = cliParser.getOptionValue("shell_args"); - } if (cliParser.hasOption("shell_env")) { String shellEnvs[] = cliParser.getOptionValues("shell_env"); for (String env : shellEnvs) { @@ -837,30 +831,40 @@ public void run() { } ctx.setLocalResources(localResources); - // Set the necessary command to execute on the allocated container - Vector vargs = new Vector(5); - - // Set executable command - vargs.add(shellCommand); - // Set shell script path - if (!shellScriptPath.isEmpty()) { - vargs.add(ExecShellStringPath); - } + String[] shellCommands = shellCommand.split(";"); + List commands = new ArrayList(); + for (int i=0; i vargs = new Vector(); + + // Set executable command + vargs.add(shellCommands[i]); + // Set shell script path + if (i == shellCommands.length - 1) { + if (!shellScriptPath.isEmpty()) { + vargs.add(ExecShellStringPath); + } + } - // Set args for the shell command if any - vargs.add(shellArgs); - // Add log redirect params - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + // Add log redirect params + if (i == 0) { + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + } else { + vargs.add("1>>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + vargs.add("2>>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + } - // Get final commmand - StringBuilder command = new StringBuilder(); - for (CharSequence str : vargs) { - command.append(str).append(" "); + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + if (i < shellCommands.length - 1) { + command.append(";"); + } + commands.add(command.toString()); } - - List commands = new ArrayList(); - commands.add(command.toString()); ctx.setCommands(commands); // Set up tokens for the container too. Today, for normal shell commands, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 199a16d..175069b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -30,6 +30,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; @@ -130,11 +131,9 @@ private final String appMasterMainClass; // Shell command to be executed - private String shellCommand = ""; + private String[] shellCommands = new String[] {}; // Location of shell script - private String shellScriptPath = ""; - // Args to be passed to the shell command - private String shellArgs = ""; + private String shellScriptPath = ""; // Env variables to be setup for the shell command private Map shellEnv = new HashMap(); // Shell Command Container priority @@ -214,9 +213,24 @@ public Client(Configuration conf) throws Exception { opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); opts.addOption("jar", true, "Jar file containing the application master"); - opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); - opts.addOption("shell_script", true, "Location of the shell script to be executed"); - opts.addOption("shell_args", true, "Command line args for the shell script"); + opts.addOption("shell_command", true, + "Shell command to be executed by the Application Master. " + + "Does not support multiple --shell_command options. " + + "For multiple shell commands and command pipeline, " + + "you can create a shell script and use --shell_script option. " + + "OR, you can append commands separated by ';' for running multiple " + + "commands, such as --shell_command \"command1 arg1 arg2 ; " + + "command2 arg3 arg4; command3 arg5 arg6\", append commands " + + "separated by '|' for command pipeline such as --shell_command " + + "\"command1 arg1 arg2 | command2 arg3 arg4 | command3 arg5 arg6\"."); + opts.getOption("shell_command").setArgs(Option.UNLIMITED_VALUES); + opts.addOption("shell_script", true, "Location of the shell script to " + + "be executed. Support only one --shell_script option " + + "with only one shell script specified. " + + "For multiple shell scripts, combine them " + + "into one shell script. " + + "If there are multiple shell commands specified in --shell_command, " + + "the shell script is only applied to the last shell command"); opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); @@ -249,6 +263,17 @@ private void printUsage() { */ public boolean init(String[] args) throws ParseException { + if (numOfTargetOpts(args, "--shell_command") > 1) { + throw new IllegalArgumentException("DistributedShell does not support " + + "multiple shell commands. Please create a shell script " + + "and use --shell_script option."); + } + + if (numOfTargetOpts(args, "--shell_script") > 1) { + throw new IllegalArgumentException("DistributedShell does not support " + + "multiple shell scripts. Please combine them into one shell script."); + } + CommandLine cliParser = new GnuParser().parse(opts, args); if (args.length == 0) { @@ -289,14 +314,11 @@ public boolean init(String[] args) throws ParseException { if (!cliParser.hasOption("shell_command")) { throw new IllegalArgumentException("No shell command specified to be executed by application master"); } - shellCommand = cliParser.getOptionValue("shell_command"); + shellCommands = cliParser.getOptionValues("shell_command"); if (cliParser.hasOption("shell_script")) { shellScriptPath = cliParser.getOptionValue("shell_script"); } - if (cliParser.hasOption("shell_args")) { - shellArgs = cliParser.getOptionValue("shell_args"); - } if (cliParser.hasOption("shell_env")) { String envs[] = cliParser.getOptionValues("shell_env"); for (String env : envs) { @@ -541,11 +563,15 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); - if (!shellCommand.isEmpty()) { - vargs.add("--shell_command " + shellCommand + ""); - } - if (!shellArgs.isEmpty()) { - vargs.add("--shell_args " + shellArgs + ""); + if (shellCommands.length > 0) { + StringBuilder command = new StringBuilder(); + command.append("'"); + for(String shellCommand : shellCommands) { + command.append(shellCommand); + command.append(" "); + } + command.append("'"); + vargs.add("--shell_command " + command + ""); } for (Map.Entry entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); @@ -715,4 +741,13 @@ private void forceKillApplication(ApplicationId appId) yarnClient.killApplication(appId); } + private int numOfTargetOpts(String[] args, String targetArgs) { + int num = 0; + for (String arg : args) { + if (arg.equals(targetArgs)) { + num ++; + } + } + return num; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 2f311b5..ae17d83 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,12 +18,17 @@ package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +76,7 @@ public static void setup() throws Exception { yarnCluster.start(); NodeManager nm = yarnCluster.getNodeManager(0); waitForNMToRegister(nm); - + URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); if (url == null) { throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); @@ -171,6 +176,80 @@ public void run() { } @Test(timeout=90000) + public void testDSShellWithMutilpleShellCommands() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "echo YARN MAPREDUCE;date +%y-%m-%d", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + final Client client = new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(result); + + List> outputList = getOutputFromContainerLog(2, 2); + Date dNow = new Date( ); + SimpleDateFormat ft = new SimpleDateFormat ("yy-MM-dd"); + String dateNow = ft.format(dNow); + + for(List strs : outputList) { + Assert.assertTrue(strs.get(0).trim().equals("YARN MAPREDUCE")); + Assert.assertTrue(strs.get(1).trim().equals(dateNow)); + } + } + + @Test(timeout=90000) + public void testDSShellWithPipeline() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "echo YARN MAPREDUCE HDFS | grep -o MAPREDUCE", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + final Client client = new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(result); + + List> outputList = getOutputFromContainerLog(2, 1); + + for(List strs : outputList) { + Assert.assertTrue(strs.get(0).trim().equals("MAPREDUCE")); + } + } + + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -266,6 +345,67 @@ public void testDSShellWithInvalidArgs() throws Exception { Assert.assertTrue("The throw exception is not expected", e.getMessage().contains("Invalid virtual cores specified")); } + + LOG.info("Initializing DS Client with multiple --shell_command options"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--shell_command", + "pwd", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("DistributedShell does not support " + + "multiple shell commands. Please create a shell script " + + "and use --shell_script option.")); + } + + LOG.info("Initializing DS Client with multiple --shell_script options"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--shell_script", + "/tmp/script1.sh", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--shell_script", + "/tmp/script2.sh" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("DistributedShell does not support " + + "multiple shell scripts. Please combine them into " + + "one shell script.")); + } + } protected static void waitForNMToRegister(NodeManager nm) @@ -306,5 +446,49 @@ public void testContainerLaunchFailureHandling() throws Exception { } + private List> getOutputFromContainerLog(int containerNum, + int estimateOutputSizePerLog) { + File logFolder = + new File(yarnCluster.getNodeManager(0).getConfig() + .get(YarnConfiguration.NM_LOG_DIRS, + YarnConfiguration.DEFAULT_NM_LOG_DIRS)); + File[] listOfFiles = logFolder.listFiles(); + File[] containerFiles = listOfFiles[listOfFiles.length - 1].listFiles(); + // verify there exists a AMContainer log and all the containers log + Assert.assertTrue(containerFiles.length == (containerNum + 1)); + List> outputList = new ArrayList>(); + for (int i = 1; i < containerFiles.length; i++) { + for (File output : containerFiles[i].listFiles()) { + if (output.getName().contains("stdout")) { + BufferedReader br = null; + List outStrings = new ArrayList(); + try { + + String sCurrentLine; + + br = new BufferedReader(new FileReader(output)); + + while ((sCurrentLine = br.readLine()) != null) { + outStrings.add(sCurrentLine); + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (br != null) + br.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + Assert.assertTrue(outStrings.size() == estimateOutputSizePerLog); + outputList.add(outStrings); + } + } + } + Assert.assertTrue(outputList.size() == containerNum); + return outputList; + } }