diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 06d96cb..6cf3e2d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -19,8 +19,9 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStreamReader; import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; @@ -376,7 +377,16 @@ public boolean init(String[] args) throws ParseException, IOException { throw new IllegalArgumentException( "No shell command specified to be executed by application master"); } - shellCommand = cliParser.getOptionValue("shell_command"); + String shellCommandPath = cliParser.getOptionValue("shell_command"); + FileInputStream fs = null; + DataInputStream ds = null; + try { + ds = new DataInputStream(new FileInputStream(shellCommandPath)); + shellCommand = ds.readUTF(); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(ds); + org.apache.commons.io.IOUtils.closeQuietly(fs); + } if (cliParser.hasOption("shell_args")) { shellArgs = cliParser.getOptionValue("shell_args"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 199a16d..b40a0f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -32,14 +32,17 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -131,6 +134,7 @@ // Shell command to be executed private String shellCommand = ""; + private final String shellCommandPath = "shellCommands"; // Location of shell script private String shellScriptPath = ""; // Args to be passed to the shell command @@ -483,6 +487,29 @@ public boolean run() throws IOException, YarnException { hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } + if (!shellCommand.isEmpty()) { + String shellCommandSuffix = + appName + "/" + appId.getId() + "/" + shellCommandPath; + Path shellCommandDst = + new Path(fs.getHomeDirectory(), shellCommandSuffix); + FSDataOutputStream ostream = null; + try { + ostream = FileSystem + .create(fs, shellCommandDst, new FsPermission((short) 0710)); + ostream.writeUTF(shellCommand); + } finally { + IOUtils.closeQuietly(ostream); + } + FileStatus scFileStatus = fs.getFileStatus(shellCommandDst); + LocalResource scRsrc = Records.newRecord(LocalResource.class); + scRsrc.setType(LocalResourceType.FILE); + scRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst + .toUri())); + scRsrc.setTimestamp(scFileStatus.getModificationTime()); + scRsrc.setSize(scFileStatus.getLen()); + localResources.put("shellCommands", scRsrc); + } // Set local resource info into app master container launch context amContainer.setLocalResources(localResources); @@ -541,8 +568,9 @@ public boolean run() throws IOException, YarnException { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); + if (!shellCommand.isEmpty()) { - vargs.add("--shell_command " + shellCommand + ""); + vargs.add("--shell_command " + shellCommandPath + ""); } if (!shellArgs.isEmpty()) { vargs.add("--shell_args " + shellArgs + ""); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index a8b546d..59148c7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -171,6 +174,38 @@ public void run() { } @Test(timeout=90000) + public void testDSShellWithCommands() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "echo HADOOP YARN MAPREDUCE|wc -w", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + List expectedContent = new ArrayList(); + expectedContent.add("3"); + verifyContainerLog(2, expectedContent); + } + + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -332,5 +367,56 @@ public void testDebugFlag() throws Exception { LOG.info("Running DS Client"); Assert.assertTrue(client.run()); } + + private void + verifyContainerLog(int containerNum, List expectedContent) { + File logFolder = + new File(yarnCluster.getNodeManager(0).getConfig() + .get(YarnConfiguration.NM_LOG_DIRS, + YarnConfiguration.DEFAULT_NM_LOG_DIRS)); + + File[] listOfFiles = logFolder.listFiles(); + int currentContainerLogFileIndex = -1; + for (int i = listOfFiles.length - 1; i >= 0; i--) { + if (listOfFiles[i].listFiles().length == containerNum + 1) { + currentContainerLogFileIndex = i; + break; + } + } + Assert.assertTrue(currentContainerLogFileIndex != -1); + File[] containerFiles = + listOfFiles[currentContainerLogFileIndex].listFiles(); + + for (int i = 0; i < containerFiles.length; i++) { + for (File output : containerFiles[i].listFiles()) { + if (output.getName().trim().equalsIgnoreCase("stdout")) { + BufferedReader br = null; + try { + + String sCurrentLine; + + br = new BufferedReader(new FileReader(output)); + int numOfline = 0; + while ((sCurrentLine = br.readLine()) != null) { + Assert.assertEquals("The current is" + sCurrentLine, + expectedContent.get(numOfline), sCurrentLine.trim()); + numOfline++; + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (br != null) + br.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } + } + } + } + }