diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 039ba3c..56f8f9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -18,14 +18,17 @@ package org.apache.hadoop.yarn.client.cli; +import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintStream; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; - +import java.util.Set; import javax.ws.rs.core.MediaType; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -42,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -52,7 +56,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; +import org.apache.hadoop.yarn.logaggregation.LogsCLIOptions; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -83,6 +89,7 @@ private static final String SHOW_META_INFO = "show_meta_info"; private static final String LIST_NODES_OPTION = "list_nodes"; private static final String OUT_OPTION = "out"; + private static final String TAR_OPTION = "tar"; public static final String HELP_CMD = "help"; @Override @@ -108,6 +115,7 @@ public int run(String[] args) throws Exception { String[] logFiles = null; List amContainersList = new ArrayList(); String localDir = null; + boolean tar = false; try { CommandLine commandLine = parser.parse(opts, args, true); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); @@ -118,6 +126,7 @@ public int run(String[] args) throws Exception { showMetaInfo = commandLine.hasOption(SHOW_META_INFO); nodesList = commandLine.hasOption(LIST_NODES_OPTION); localDir = commandLine.getOptionValue(OUT_OPTION); + tar = commandLine.hasOption(TAR_OPTION); if (getAMContainerLogs) { try { amContainersList = parseAMContainer(commandLine, printOpts); @@ -181,9 +190,9 @@ public int run(String[] args) throws Exception { } } - ContainerLogsRequest request = new ContainerLogsRequest(appId, + LogsCLIOptions request = new LogsCLIOptions(appId, isApplicationFinished(appState), appOwner, - nodeAddress, null, containerIdStr); + nodeAddress, null, containerIdStr, localDir, tar); if (showMetaInfo) { return showMetaInfo(request, logCliHelper); @@ -196,7 +205,7 @@ public int run(String[] args) throws Exception { // To get am logs if (getAMContainerLogs) { return fetchAMContainerLogs(request, amContainersList, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } int resultCode = 0; @@ -208,12 +217,10 @@ public int run(String[] args) throws Exception { + " does not have the container:" + containerId); return -1; } - return fetchContainerLogs(request, logFiles, - logCliHelper, localDir); + return fetchContainerLogs(request, logFiles, logCliHelper); } else { if (nodeAddress == null) { - resultCode = fetchApplicationLogs(appId, appOwner, - logCliHelper, localDir); + resultCode = fetchApplicationLogs(request, logCliHelper); } else { System.err.println("Should at least provide ContainerId!"); printHelpMessage(printOpts); @@ -352,16 +359,17 @@ private boolean fetchAllLogFiles(String[] logFiles) { } private void printContainerLogsFromRunningApplication(Configuration conf, - ContainerLogsRequest request, String[] logFiles, - LogCLIHelpers logCliHelper, String localDir) throws IOException { - String appId = request.getAppId().toString(); + LogsCLIOptions request, String[] logFiles, + LogCLIHelpers logCliHelper) throws IOException { String containerIdStr = request.getContainerId().toString(); String[] requestedLogFiles = logFiles; String nodeHttpAddress = request.getNodeHttpAddress(); - String nodeId = request.getNodeId(); - String appOwner = request.getAppOwner(); + final String nodeId = request.getNodeId(); + String localDir = request.getOutputLocalDir(); PrintStream out = logCliHelper.createPrintStream(localDir, nodeId, containerIdStr); + Set filesToTar = new HashSet(); + Set dirsToTar = new HashSet(); try { // fetch all the log files for the container if (fetchAllLogFiles(logFiles)) { @@ -395,31 +403,62 @@ private void printContainerLogsFromRunningApplication(Configuration conf, + nodeId); } } + if (localDir != null && !localDir.isEmpty() + && request.isTarOutputFile()) { + Path dirToTar = new Path(localDir, + LogAggregationUtils.getNodeString(nodeId)); + Path fileToTar = new Path(LogAggregationUtils.getNodeString(nodeId), + containerIdStr); + dirsToTar.add(dirToTar.toString()); + filesToTar.add(fileToTar.toString()); + } } finally { logCliHelper.closePrintStream(out); } // for the case, we have already uploaded partial logs in HDFS - logCliHelper.dumpAContainersLogsForALogType(appId, containerIdStr, nodeId, - appOwner, Arrays.asList(requestedLogFiles), false, localDir); + // if we have, we should compress the logs fron HDFS and logs from + // NodeManager together. + request.setTarOutputFile(false); + int aggregatedLogs = logCliHelper.dumpAContainersLogsForALogType(request, + Arrays.asList(requestedLogFiles), false); + if (localDir != null && !localDir.isEmpty() + && request.isTarOutputFile() && aggregatedLogs == 0) { + File rootDir = new File(localDir); + File[] nodeLists = rootDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.contains(LogAggregationUtils.getNodeString(nodeId)); + } + }); + if (nodeLists != null && nodeLists.length > 0) { + for (File nodeList : nodeLists) { + dirsToTar.add(nodeList.getAbsolutePath()); + File[] logLists = nodeList.listFiles(); + for (File log : logLists) { + filesToTar.add(log.getAbsolutePath()); + } + } + } + } + // tar all the output files, and delete them afterward + logCliHelper.tarTheFiles(localDir, + containerIdStr.toString() + LogCLIHelpers.TAR_POSTFIX, + filesToTar, dirsToTar); } private void printContainerLogsForFinishedApplication( - ContainerLogsRequest request, String[] logFiles, - LogCLIHelpers logCliHelper, String localDir) + LogsCLIOptions request, String[] logFiles, + LogCLIHelpers logCliHelper) throws IOException { - logCliHelper.dumpAContainersLogsForALogType(request.getAppId().toString(), - request.getContainerId().toString(), request.getNodeId(), - request.getAppOwner(), logFiles != null ? Arrays.asList(logFiles) - : null, localDir); + logCliHelper.dumpAContainersLogsForALogType(request, logFiles != null + ? Arrays.asList(logFiles) : null); } private int printContainerLogsForFinishedApplicationWithoutNodeId( - String appId, String containerId, String[] logFiles, - LogCLIHelpers logCliHelper, String appOwner, String localDir) - throws IOException { - return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId(appId, - containerId, appOwner, logFiles != null ? - Arrays.asList(logFiles) : null, localDir); + LogsCLIOptions request, String[] logFiles, + LogCLIHelpers logCliHelper) throws IOException { + return logCliHelper.dumpAContainersLogsForALogTypeWithoutNodeId( + request, logFiles != null ? Arrays.asList(logFiles) : null); } private ContainerReport getContainerReport(String containerIdStr) @@ -440,12 +479,12 @@ private boolean isApplicationFinished(YarnApplicationState appState) { } private int printAMContainerLogs(Configuration conf, - ContainerLogsRequest request, List amContainers, - String[] logFiles, LogCLIHelpers logCliHelper, String localDir) + LogsCLIOptions request, List amContainers, + String[] logFiles, LogCLIHelpers logCliHelper) throws Exception { List amContainersList = null; - List requests = - new ArrayList(); + List requests = + new ArrayList(); boolean getAMContainerLists = false; String appId = request.getAppId().toString(); String errorMessage = ""; @@ -454,7 +493,7 @@ private int printAMContainerLogs(Configuration conf, if (amContainersList != null && !amContainersList.isEmpty()) { getAMContainerLists = true; for (JSONObject amContainer : amContainersList) { - ContainerLogsRequest amRequest = new ContainerLogsRequest(request); + LogsCLIOptions amRequest = new LogsCLIOptions(request); amRequest.setContainerId(amContainer.getString("containerId")); amRequest.setNodeHttpAddress( amContainer.getString("nodeHttpAddress")); @@ -470,7 +509,7 @@ private int printAMContainerLogs(Configuration conf, if (amContainersList != null && !amContainersList.isEmpty()) { getAMContainerLists = true; for (JSONObject amContainer : amContainersList) { - ContainerLogsRequest amRequest = new ContainerLogsRequest( + LogsCLIOptions amRequest = new LogsCLIOptions( request); amRequest.setContainerId(amContainer.getString("amContainerId")); requests.add(amRequest); @@ -490,9 +529,8 @@ private int printAMContainerLogs(Configuration conf, } if (amContainers.contains("ALL")) { - for (ContainerLogsRequest amRequest : requests) { - outputAMContainerLogs(amRequest, conf, logFiles, - logCliHelper, localDir); + for (LogsCLIOptions amRequest : requests) { + outputAMContainerLogs(amRequest, conf, logFiles, logCliHelper); } System.out.println(); System.out.println("Specified ALL for -am option. " @@ -502,11 +540,11 @@ private int printAMContainerLogs(Configuration conf, int amContainerId = Integer.parseInt(amContainer.trim()); if (amContainerId == -1) { outputAMContainerLogs(requests.get(requests.size() - 1), conf, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } else { if (amContainerId <= requests.size()) { outputAMContainerLogs(requests.get(amContainerId - 1), conf, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } } } @@ -514,9 +552,9 @@ private int printAMContainerLogs(Configuration conf, return 0; } - private void outputAMContainerLogs(ContainerLogsRequest request, + private void outputAMContainerLogs(LogsCLIOptions request, Configuration conf, String[] logFiles, - LogCLIHelpers logCliHelper, String localDir) throws Exception { + LogCLIHelpers logCliHelper) throws Exception { String nodeHttpAddress = request.getNodeHttpAddress(); String containerId = request.getContainerId(); String nodeId = request.getNodeId(); @@ -539,7 +577,7 @@ private void outputAMContainerLogs(ContainerLogsRequest request, requestedLogFilesList = logFiles; } printContainerLogsForFinishedApplication(request, - requestedLogFilesList, logCliHelper, localDir); + requestedLogFilesList, logCliHelper); } } } else { @@ -552,12 +590,12 @@ private void outputAMContainerLogs(ContainerLogsRequest request, getContainerLogFiles(getConf(), containerId, nodeHttpAddress); } printContainerLogsFromRunningApplication(conf, - request, requestedLogFiles, logCliHelper, localDir); + request, requestedLogFiles, logCliHelper); } } } - private int showMetaInfo(ContainerLogsRequest request, + private int showMetaInfo(LogsCLIOptions request, LogCLIHelpers logCliHelper) throws IOException { if (!request.isAppFinished()) { System.err.println("The -show_meta_info command can be only used " @@ -571,7 +609,7 @@ private int showMetaInfo(ContainerLogsRequest request, } } - private int showNodeLists(ContainerLogsRequest request, + private int showNodeLists(LogsCLIOptions request, LogCLIHelpers logCliHelper) throws IOException { if (!request.isAppFinished()) { System.err.println("The -list_nodes command can be only used with " @@ -632,6 +670,7 @@ private Options createCommandOpts() { opts.addOption(OUT_OPTION, true, "Local directory for storing individual " + "container logs. The container logs will be stored based on the " + "node the container ran on."); + opts.addOption(TAR_OPTION, false, "Work with -out to compress the logs."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); @@ -652,6 +691,7 @@ private Options createPrintOpts(Options commandOpts) { printOpts.addOption(commandOpts.getOption(SHOW_META_INFO)); printOpts.addOption(commandOpts.getOption(LIST_NODES_OPTION)); printOpts.addOption(commandOpts.getOption(OUT_OPTION)); + printOpts.addOption(commandOpts.getOption(TAR_OPTION)); return printOpts; } @@ -686,9 +726,9 @@ private Options createPrintOpts(Options commandOpts) { return amContainersList; } - private int fetchAMContainerLogs(ContainerLogsRequest request, + private int fetchAMContainerLogs(LogsCLIOptions request, List amContainersList, String[] logFiles, - LogCLIHelpers logCliHelper, String localDir) throws Exception { + LogCLIHelpers logCliHelper) throws Exception { // if we do not specify the value for CONTAINER_LOG_FILES option, // we will only output syslog if (logFiles == null || logFiles.length == 0) { @@ -701,7 +741,7 @@ private int fetchAMContainerLogs(ContainerLogsRequest request, // related logs if (!request.isAppFinished()) { return printAMContainerLogs(getConf(), request, amContainersList, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } else { // If the application is in the final state, we will call RM webservice // to get all AppAttempts information first. If we get nothing, @@ -712,7 +752,7 @@ private int fetchAMContainerLogs(ContainerLogsRequest request, if (getConf().getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { return printAMContainerLogs(getConf(), request, amContainersList, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } else { ApplicationId appId = request.getAppId(); String appOwner = request.getAppOwner(); @@ -728,8 +768,8 @@ private int fetchAMContainerLogs(ContainerLogsRequest request, } } - private int fetchContainerLogs(ContainerLogsRequest request, - String[] logFiles, LogCLIHelpers logCliHelper, String localDir) + private int fetchContainerLogs(LogsCLIOptions request, + String[] logFiles, LogCLIHelpers logCliHelper) throws IOException { int resultCode = 0; String appIdStr = request.getAppId().toString(); @@ -750,8 +790,7 @@ private int fetchContainerLogs(ContainerLogsRequest request, } else { logs = Arrays.asList(logFiles); } - return logCliHelper.dumpAContainersLogsForALogType(appIdStr, - containerIdStr, nodeAddress, appOwner, logs, localDir); + return logCliHelper.dumpAContainersLogsForALogType(request, logs); } String nodeHttpAddress = null; String nodeId = null; @@ -773,8 +812,7 @@ private int fetchContainerLogs(ContainerLogsRequest request, requestedLogFiles = null; } return printContainerLogsForFinishedApplicationWithoutNodeId( - appIdStr, containerIdStr, requestedLogFiles, logCliHelper, - appOwner, localDir); + request, requestedLogFiles, logCliHelper); } else { System.err.println("Unable to get logs for this container:" + containerIdStr + "for the application:" + appIdStr @@ -794,7 +832,7 @@ private int fetchContainerLogs(ContainerLogsRequest request, logFiles = new String[] {"syslog"}; } printContainerLogsFromRunningApplication(getConf(), request, - logFiles, logCliHelper, localDir); + logFiles, logCliHelper); } else { String[] requestedLogFiles = logFiles; if(fetchAllLogFiles(logFiles)) { @@ -803,15 +841,24 @@ private int fetchContainerLogs(ContainerLogsRequest request, // If the application is in the final state, we will directly // get the container logs from HDFS. printContainerLogsForFinishedApplication(request, - requestedLogFiles, logCliHelper, localDir); + requestedLogFiles, logCliHelper); } return resultCode; } - private int fetchApplicationLogs(ApplicationId appId, String appOwner, - LogCLIHelpers logCliHelper, String localDir) throws IOException { + private int fetchApplicationLogs(LogsCLIOptions request, + LogCLIHelpers logCliHelper) throws IOException { + ApplicationId appId = request.getAppId(); + String appOwner = request.getAppOwner(); + if (!request.isAppFinished()) { + System.err.println("The application is still running. " + + "Please try later after the application finishes or " + + "use 'yarn logs -applicationId -containerId ' " + + "to get specific container logs"); + return -1; + } int resultCode = - logCliHelper.dumpAllContainersLogs(appId, appOwner, localDir); + logCliHelper.dumpAllContainersLogs(request); if (resultCode == -1) { System.err.println("Can not find the logs for the application: " + appId + " with the appOwner: " + appOwner); @@ -832,81 +879,4 @@ private String guessAppOwner(ApplicationReport appReport, } return appOwner; } - - private static class ContainerLogsRequest { - private ApplicationId appId; - private String containerId; - private String nodeId; - private String nodeHttpAddress; - private String appOwner; - private boolean appFinished; - - public ContainerLogsRequest(ContainerLogsRequest request) { - this.setAppId(request.getAppId()); - this.setAppFinished(request.isAppFinished()); - this.setAppOwner(request.getAppOwner()); - this.setNodeId(request.getNodeId()); - this.setNodeHttpAddress(request.getNodeHttpAddress()); - this.setContainerId(request.getContainerId()); - } - - public ContainerLogsRequest(ApplicationId applicationId, - boolean isAppFinished, String owner, - String address, String httpAddress, String container) { - this.setAppId(applicationId); - this.setAppFinished(isAppFinished); - this.setAppOwner(owner); - this.setNodeId(address); - this.setNodeHttpAddress(httpAddress); - this.setContainerId(container); - } - - public ApplicationId getAppId() { - return appId; - } - - public void setAppId(ApplicationId appId) { - this.appId = appId; - } - - public String getContainerId() { - return containerId; - } - - public void setContainerId(String containerId) { - this.containerId = containerId; - } - - public String getNodeId() { - return nodeId; - } - - public void setNodeId(String nodeAddress) { - this.nodeId = nodeAddress; - } - - public String getAppOwner() { - return appOwner; - } - - public void setAppOwner(String appOwner) { - this.appOwner = appOwner; - } - - public String getNodeHttpAddress() { - return nodeHttpAddress; - } - - public void setNodeHttpAddress(String nodeHttpAddress) { - this.nodeHttpAddress = nodeHttpAddress; - } - - public boolean isAppFinished() { - return appFinished; - } - - public void setAppFinished(boolean appFinished) { - this.appFinished = appFinished; - } - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index d649ce7..9c385af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -47,8 +47,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -195,6 +197,7 @@ public void testHelpMessage() throws Exception { pw.println(" for all the containers on the specific"); pw.println(" NodeManager. Currently, this option can"); pw.println(" only be used for finished applications."); + pw.println(" -tar Work with -out to compress the logs."); pw.close(); String appReportStr = baos.toString("UTF-8"); Assert.assertEquals(appReportStr, sysOutStream.toString()); @@ -569,6 +572,50 @@ public void testSaveContainerLogsLocally() throws Exception { String container2= readContainerContent(container2Dir[0].getPath(), fs); assertTrue(container2.contains("Hello " + containerId2 + " in syslog!")); + + // delete the localPath before we test -tar option + fs.delete(localPath, true); + exitCode = cli.run(new String[] {"-applicationId", + appId.toString(), + "-out" , localPath.toString(), "-tar"}); + assertTrue(exitCode == 0); + FileStatus[] tarFiles = fs.listStatus(localPath); + // We should only have one tar file + assertTrue(tarFiles.length == 1); + assertTrue(tarFiles[0].getPath().getName().equals(appId + ".tar")); + + // untar the tar file and check log content + File tarFile = new File(fs.makeQualified(tarFiles[0].getPath()).toUri()); + File localDirFile = new File(localPath.toString()); + FileUtil.unTar(tarFile, localDirFile); + nodeDir = fs.listStatus(localPath, new PathFilter() { + @Override + public boolean accept(Path path) { + return !path.getName().contains(".tar"); + } + }); + Arrays.sort(nodeDir); + assertTrue(nodeDir.length == 2); + assertTrue(nodeDir[0].getPath().getName().contains( + LogAggregationUtils.getNodeString(nodeId))); + assertTrue(nodeDir[1].getPath().getName().contains( + LogAggregationUtils.getNodeString(nodeId2))); + + container1Dir = fs.listStatus(nodeDir[0].getPath()); + assertTrue(container1Dir.length == 1); + assertTrue(container1Dir[0].getPath().getName().equals( + containerId1.toString())); + container1= readContainerContent(container1Dir[0].getPath(), fs); + assertTrue(container1.contains("Hello " + containerId1 + + " in syslog!")); + + container2Dir = fs.listStatus(nodeDir[1].getPath()); + assertTrue(container2Dir.length == 1); + assertTrue(container2Dir[0].getPath().getName().equals( + containerId2.toString())); + container2= readContainerContent(container2Dir[0].getPath(), fs); + assertTrue(container2.contains("Hello " + containerId2 + + " in syslog!")); } finally { fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 339df9d..1fbc2b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -18,15 +18,25 @@ package org.apache.hadoop.yarn.logaggregation; +import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -44,19 +54,23 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.util.ConverterUtils; - import com.google.common.annotations.VisibleForTesting; public class LogCLIHelpers implements Configurable { private Configuration conf; + public static final String TAR_POSTFIX = ".tar"; @Private @VisibleForTesting public int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner) throws IOException { - return dumpAContainersLogsForALogType(appId, containerId, nodeId, jobOwner, - null, null); + LogsCLIOptions options = new LogsCLIOptions(); + options.setAppId(ConverterUtils.toApplicationId(appId)); + options.setContainerId(containerId); + options.setNodeId(nodeId); + options.setAppOwner(jobOwner); + return dumpAContainersLogsForALogType(options, null, false); } @Private @@ -108,25 +122,29 @@ public static String getOwnerForAppIdOrNull( @Private @VisibleForTesting - public int dumpAContainersLogsForALogType(String appId, String containerId, - String nodeId, String jobOwner, List logType, String localDir) - throws IOException { - return dumpAContainersLogsForALogType(appId, containerId, nodeId, - jobOwner, logType, true, localDir); + public int dumpAContainersLogsForALogType(LogsCLIOptions options, + List logType) throws IOException { + return dumpAContainersLogsForALogType(options, logType, true); } @Private @VisibleForTesting - public int dumpAContainersLogsForALogType(String appId, String containerId, - String nodeId, String jobOwner, List logType, - boolean outputFailure, String localDir) throws IOException { - ApplicationId applicationId = ConverterUtils.toApplicationId(appId); + public int dumpAContainersLogsForALogType(LogsCLIOptions options, + List logType, boolean outputFailure) throws IOException { + ApplicationId applicationId = options.getAppId(); + String jobOwner = options.getAppOwner(); + String nodeId = options.getNodeId(); + String containerId = options.getContainerId(); + String localDir = options.getOutputLocalDir(); + boolean isTarOutFile = options.isTarOutputFile(); RemoteIterator nodeFiles = getRemoteNodeFileDir( applicationId, jobOwner); if (nodeFiles == null) { return -1; } boolean foundContainerLogs = false; + Set filesToTar = new HashSet(); + Set dirsToTar = new HashSet(); while (nodeFiles.hasNext()) { FileStatus thisNodeFile = nodeFiles.next(); String fileName = thisNodeFile.getPath().getName(); @@ -158,6 +176,14 @@ public int dumpAContainersLogsForALogType(String appId, String containerId, foundContainerLogs = true; } } + if (localDir != null && !localDir.isEmpty() && isTarOutFile) { + Path dirToTar = new Path(localDir, + thisNodeFile.getPath().getName()); + Path fileToTar = new Path(thisNodeFile.getPath().getName(), + containerId); + dirsToTar.add(dirToTar.toString()); + filesToTar.add(fileToTar.toString()); + } } finally { if (reader != null) { reader.close(); @@ -166,6 +192,9 @@ public int dumpAContainersLogsForALogType(String appId, String containerId, } } } + // tar all the output files, and delete them afterward + tarTheFiles(localDir, containerId.toString() + TAR_POSTFIX, + filesToTar, dirsToTar); if (!foundContainerLogs) { if (outputFailure) { containerLogNotFound(containerId); @@ -176,15 +205,20 @@ public int dumpAContainersLogsForALogType(String appId, String containerId, } @Private - public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId, - String containerId, String jobOwner, List logType, - String localDir) throws IOException { - ApplicationId applicationId = ConverterUtils.toApplicationId(appId); + public int dumpAContainersLogsForALogTypeWithoutNodeId(LogsCLIOptions options, + List logType) throws IOException { + ApplicationId applicationId = options.getAppId(); + String jobOwner = options.getAppOwner(); + String containerId = options.getContainerId(); + String localDir = options.getOutputLocalDir(); + boolean isTarOutFile = options.isTarOutputFile(); RemoteIterator nodeFiles = getRemoteNodeFileDir( applicationId, jobOwner); if (nodeFiles == null) { return -1; } + Set filesToTar = new HashSet(); + Set dirsToTar = new HashSet(); boolean foundContainerLogs = false; while(nodeFiles.hasNext()) { FileStatus thisNodeFile = nodeFiles.next(); @@ -217,6 +251,14 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId, foundContainerLogs = true; } } + if (localDir != null && !localDir.isEmpty() && isTarOutFile) { + Path dirToTar = new Path(localDir, + thisNodeFile.getPath().getName()); + Path fileToTar = new Path(thisNodeFile.getPath().getName(), + containerId); + dirsToTar.add(dirToTar.toString()); + filesToTar.add(fileToTar.toString()); + } } finally { if (reader != null) { reader.close(); @@ -225,6 +267,9 @@ public int dumpAContainersLogsForALogTypeWithoutNodeId(String appId, } } } + // tar all the output files, and delete them afterward + tarTheFiles(localDir, containerId.toString() + TAR_POSTFIX, + filesToTar, dirsToTar); if (!foundContainerLogs) { containerLogNotFound(containerId); return -1; @@ -303,14 +348,19 @@ public int dumpAContainerLogsForALogType(String containerIdStr, } @Private - public int dumpAllContainersLogs(ApplicationId appId, String appOwner, - String localDir) throws IOException { + public int dumpAllContainersLogs(LogsCLIOptions options) throws IOException { + ApplicationId appId = options.getAppId(); + String appOwner = options.getAppOwner(); + String localDir = options.getOutputLocalDir(); + boolean isTarOutFile = options.isTarOutputFile(); RemoteIterator nodeFiles = getRemoteNodeFileDir( appId, appOwner); if (nodeFiles == null) { return -1; } boolean foundAnyLogs = false; + Set filesToTar = new HashSet(); + Set dirsToTar = new HashSet(); while (nodeFiles.hasNext()) { FileStatus thisNodeFile = nodeFiles.next(); if (thisNodeFile.getPath().getName().equals(appId + ".har")) { @@ -325,7 +375,6 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner, new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath()); try { - DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -348,6 +397,14 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner, break; } } + if (localDir != null && !localDir.isEmpty() && isTarOutFile) { + Path dirToTar = new Path(localDir, + thisNodeFile.getPath().getName()); + Path fileToTar = new Path(thisNodeFile.getPath().getName(), + key.toString()); + dirsToTar.add(dirToTar.toString()); + filesToTar.add(fileToTar.toString()); + } } finally { closePrintStream(out); } @@ -361,6 +418,9 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner, } } } + // tar all the output files, and delete them afterward + tarTheFiles(localDir, appId.toString() + TAR_POSTFIX, + filesToTar, dirsToTar); if (!foundAnyLogs) { emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString()); return -1; @@ -547,4 +607,45 @@ public void closePrintStream(PrintStream out) { IOUtils.closeQuietly(out); } } + + @Private + public void tarTheFiles(String localDir, String tarFileName, + Set filesToTar, Set dirsToTar) throws IOException { + if (localDir != null && !localDir.isEmpty() && !filesToTar.isEmpty()) { + Path destFile = new Path(localDir, tarFileName); + Set absolutePathes = new HashSet(); + TarArchiveOutputStream tOut = null; + try { + tOut = new TarArchiveOutputStream(new GzipCompressorOutputStream( + new BufferedOutputStream(new FileOutputStream( + new File(destFile.toString()))))); + for (String fileToTar : filesToTar) { + String absolutePath = new Path(localDir, fileToTar).toString(); + absolutePathes.add(absolutePath); + File f = new File(absolutePath); + if (f.exists()) { + TarArchiveEntry tarEntry = new TarArchiveEntry(f, fileToTar); + tOut.putArchiveEntry(tarEntry); + IOUtils.copy(new FileInputStream(f), tOut); + tOut.closeArchiveEntry(); + } + } + } finally { + if (tOut != null) { + IOUtils.closeQuietly(tOut); + } + } + removeTempFiles(absolutePathes); + removeTempFiles(dirsToTar); + } + } + + private void removeTempFiles(Set filesToRemove) { + for(String fileToRemove : filesToRemove) { + File f = new File(fileToRemove); + if (f.exists()) { + f.delete(); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogsCLIOptions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogsCLIOptions.java new file mode 100644 index 0000000..4df336b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogsCLIOptions.java @@ -0,0 +1,105 @@ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class LogsCLIOptions { + private ApplicationId appId; + private String containerId; + private String nodeId; + private String nodeHttpAddress; + private String appOwner; + private boolean appFinished; + private String outputLocalDir; + private boolean tarOutputFile; + + public LogsCLIOptions() {} + + public LogsCLIOptions(LogsCLIOptions request) { + this.setAppId(request.getAppId()); + this.setAppFinished(request.isAppFinished()); + this.setAppOwner(request.getAppOwner()); + this.setNodeId(request.getNodeId()); + this.setNodeHttpAddress(request.getNodeHttpAddress()); + this.setContainerId(request.getContainerId()); + this.setOutputLocalDir(request.getOutputLocalDir()); + this.setTarOutputFile(request.isTarOutputFile()); + } + + public LogsCLIOptions(ApplicationId applicationId, + boolean isAppFinished, String owner, + String address, String httpAddress, String container, String localDir, + boolean isTar) { + this.setAppId(applicationId); + this.setAppFinished(isAppFinished); + this.setAppOwner(owner); + this.setNodeId(address); + this.setNodeHttpAddress(httpAddress); + this.setContainerId(container); + this.setOutputLocalDir(localDir); + this.setTarOutputFile(isTar); + } + + public ApplicationId getAppId() { + return appId; + } + + public void setAppId(ApplicationId appId) { + this.appId = appId; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeAddress) { + this.nodeId = nodeAddress; + } + + public String getAppOwner() { + return appOwner; + } + + public void setAppOwner(String appOwner) { + this.appOwner = appOwner; + } + + public String getNodeHttpAddress() { + return nodeHttpAddress; + } + + public void setNodeHttpAddress(String nodeHttpAddress) { + this.nodeHttpAddress = nodeHttpAddress; + } + + public boolean isAppFinished() { + return appFinished; + } + + public void setAppFinished(boolean appFinished) { + this.appFinished = appFinished; + } + + public String getOutputLocalDir() { + return outputLocalDir; + } + + public void setOutputLocalDir(String outputLocalDir) { + this.outputLocalDir = outputLocalDir; + } + + public boolean isTarOutputFile() { + return tarOutputFile; + } + + public void setTarOutputFile(boolean tarOutputFile) { + this.tarOutputFile = tarOutputFile; + } +}