diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index f3ce64c..fb64837 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -75,6 +76,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; /** * Client for Distributed Shell application submission to YARN. @@ -164,6 +166,9 @@ private long attemptFailuresValidityInterval = -1; + private String logAggregationIncludePattern = ""; + private String logAggregationExcludePattern = ""; + // Debug flag boolean debugFlag = false; @@ -255,6 +260,12 @@ public Client(Configuration conf) throws Exception { "the validityInterval into failure count. " + "If failure count reaches to maxAppAttempts, " + "the application will be failed."); + opts.addOption("logAggregationIncludePattern", true, "It uses Java Regex " + + "to filter the log files which match the defined include pattern " + + "and those log files will be uploaded."); + opts.addOption("logAggregationExcludePattern", true, "It uses Java Regex " + + "to filter the log files which match the defined exclude pattern " + + "and those log files will not be uploaded."); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -383,6 +394,14 @@ public boolean init(String[] args) throws ParseException { Long.parseLong(cliParser.getOptionValue( "attempt_failures_validity_interval", "-1")); + if (cliParser.hasOption("logAggregationIncludePattern")) { + logAggregationIncludePattern = + cliParser.getOptionValue("logAggregationIncludePattern"); + } + if (cliParser.hasOption("logAggregationExcludePattern")) { + logAggregationExcludePattern = + cliParser.getOptionValue("logAggregationExcludePattern"); + } log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true; @@ -472,6 +491,15 @@ public boolean run() throws IOException, YarnException { .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + LogAggregationContext logAggregationContext = + Records.newRecord(LogAggregationContext.class); + if (!logAggregationIncludePattern.isEmpty()) { + logAggregationContext.setIncludePattern(logAggregationIncludePattern); + } + if (!logAggregationExcludePattern.isEmpty()) { + logAggregationContext.setExcludePattern(logAggregationExcludePattern); + } + appContext.setLogAggregationContext(logAggregationContext); // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 6dff94c..16d975c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -25,6 +25,7 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; +import java.io.PrintStream; import java.io.PrintWriter; import java.net.InetAddress; import java.net.URL; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.cli.LogsCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; @@ -73,6 +75,8 @@ public void setup() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class); conf.set("yarn.log.dir", "target"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/app-remote/"); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); if (yarnCluster == null) { yarnCluster = new MiniYARNCluster( @@ -776,6 +780,91 @@ public void testDebugFlag() throws Exception { Assert.assertTrue(client.run()); } + @Test (timeout = 90000) + public void testDSWithLogAggregationContext() throws Exception{ + Configuration conf = yarnCluster.getConfig(); + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--logAggregationIncludePattern", + "^std*", + "--logAggregationExcludePattern", + "stderr" + }; + + LOG.info("Initializing DS Client"); + final Client client = new Client(conf); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + result.set(client.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + ApplicationReport appReport = null; + while(true) { + List apps = yarnClient.getApplications(); + if (apps.size() == 0 ) { + Thread.sleep(10); + continue; + } + appReport = apps.get(0); + if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { + break; + } + } + t.join(); + LOG.info("Client run completed. Result=" + result); + Assert.assertTrue(result.get()); + Assert.assertNotNull(appReport); + + ByteArrayOutputStream sysOutStream = new ByteArrayOutputStream(); + PrintStream sysOut = new PrintStream(sysOutStream); + System.setOut(sysOut); + LogsCLI cli = new LogsCLI(); + cli.setConf(conf); + int exitCode = -1; + final int maxAttempts = 50; + int count = 0; + while (count <= maxAttempts) { + exitCode = cli.run( new String[] { + "-applicationId", appReport.getApplicationId().toString() } ); + if (exitCode != 0) { + count ++; + Thread.sleep(500); + } else { + break; + } + } + Assert.assertTrue(exitCode == 0); + Assert.assertTrue(sysOutStream.toString().contains("LogType: stdout")); + Assert.assertTrue(!sysOutStream.toString().contains("LogType: stderr")); + } + private int verifyContainerLog(int containerNum, List expectedContent, boolean count, String expectedWord) { File logFolder =