diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 5f60392..c9e00ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -467,7 +468,7 @@ public ContainerReport getContainerReport(String containerIdStr) assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( logMessage(containerId1, "syslog"))); - assertTrue(sysOutStream.toString().contains("Log Upload Time")); + assertTrue(sysOutStream.toString().contains("LogLastModifiedTime")); assertTrue(!sysOutStream.toString().contains( "Logs for container " + containerId1.toString() + " are not present in this log-file.")); @@ -491,8 +492,12 @@ public ContainerReport getContainerReport(String containerIdStr) String logMessage = logMessage(containerId3, "stdout"); int fileContentSize = logMessage.getBytes().length; - int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length; - + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:stdout"; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + int tailContentSize = sb.toString().length(); // specify how many bytes we should get from logs // specify a position number, it would get the first n bytes from // container log diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java new file mode 100644 index 0000000..eda35eb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; + +/** + * Utils for rendering aggregated logs block. + * + */ +@Private +public class LogAggregationWebUtils { + + /** + * Verify and parse start index from html. + * @param html the html + * @param startStr the start index string + * @return the startIndex + */ + public static long verifyAndGetLogStartIndex(Block html, String startStr) + throws NumberFormatException { + long start = -4096; + + if (startStr != null && !startStr.isEmpty()) { + start = Long.parseLong(startStr); + } + return start; + } + + /** + * Verify and parse end index from html. + * @param html the html + * @param endStr the end index string + * @return the endIndex + */ + public static long verifyAndGetLogEndIndex(Block html, String endStr) + throws NumberFormatException { + long end = Long.MAX_VALUE; + + if (endStr != null && !endStr.isEmpty()) { + end = Long.parseLong(endStr); + } + return end; + } + + /** + * Verify and parse containerId. + * @param html the html + * @param containerIdStr the containerId string + * @return the {@link ContainerId} + */ + public static ContainerId verifyAndGetContainerId(Block html, + String containerIdStr) { + if (containerIdStr == null || containerIdStr.isEmpty()) { + html.h1().__("Cannot get container logs without a ContainerId").__(); + return null; + } + ContainerId containerId = null; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException e) { + html.h1() + .__("Cannot get container logs for invalid containerId: " + + containerIdStr).__(); + return null; + } + return containerId; + } + + /** + * Verify and parse NodeId. + * @param html the html + * @param nodeIdStr the nodeId string + * @return the {@link NodeId} + */ + public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) { + if (nodeIdStr == null || nodeIdStr.isEmpty()) { + html.h1().__("Cannot get container logs without a NodeId").__(); + return null; + } + NodeId nodeId = null; + try { + nodeId = NodeId.fromString(nodeIdStr); + } catch (IllegalArgumentException e) { + html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr) + .__(); + return null; + } + return nodeId; + } + + /** + * Verify and parse the application owner. + * @param html the html + * @param appOwner the Application owner + * @return the appOwner + */ + public static String verifyAndGetAppOwner(Block html, String appOwner) { + if (appOwner == null || appOwner.isEmpty()) { + html.h1().__("Cannot get container logs without an app owner").__(); + } + return appOwner; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index cf34a1a..d7cde4c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.logaggregation; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; @@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormat; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormatFactory; import com.google.common.annotations.VisibleForTesting; public class LogCLIHelpers implements Configurable { @@ -56,6 +53,7 @@ "Container: %s on %s"; private Configuration conf; + private LogAggregationFileFormatFactory factory; @Private @VisibleForTesting @@ -130,309 +128,71 @@ public int dumpAContainerLogsForLogType(ContainerLogsRequest options) @VisibleForTesting public int dumpAContainerLogsForLogType(ContainerLogsRequest options, boolean outputFailure) throws IOException { - ApplicationId applicationId = options.getAppId(); - String jobOwner = options.getAppOwner(); - String nodeId = options.getNodeId(); - String containerId = options.getContainerId(); - String localDir = options.getOutputLocalDir(); - List logType = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - applicationId, jobOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundContainerLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - String fileName = thisNodeFile.getPath().getName(); - if (fileName.equals(applicationId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) - && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - PrintStream out = createPrintStream(localDir, fileName, containerId); - try { - reader = new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (getContainerLogsStream(containerId, reader) == null) { - continue; - } - String containerString = String.format(CONTAINER_ON_NODE_PATTERN, - containerId, thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - // We have to re-create reader object to reset the stream index - // after calling getContainerLogsStream which would move the stream - // index to the end of the log file. - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (logType == null || logType.isEmpty()) { - if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime(), options.getBytes()) > -1) { - foundContainerLogs = true; - } - } else { - if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType, - options.getBytes()) > -1) { - foundContainerLogs = true; - } - } - } finally { - if (reader != null) { - reader.close(); - } - closePrintStream(out); + try { + boolean foundAnyLogs = this.getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogs(options, null); + if (!foundAnyLogs) { + if (outputFailure) { + containerLogNotFound(options.getContainerId()); } + return -1; } - } - if (!foundContainerLogs) { - if (outputFailure) { - containerLogNotFound(containerId); - } + return 0; + } catch (IOException ex) { + System.err.println(ex.getMessage()); return -1; } - return 0; } @Private public int dumpAContainerLogsForLogTypeWithoutNodeId( ContainerLogsRequest options) throws IOException { - ApplicationId applicationId = options.getAppId(); - String jobOwner = options.getAppOwner(); - String containerId = options.getContainerId(); - String localDir = options.getOutputLocalDir(); - List logType = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - applicationId, jobOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundContainerLogs = false; - while(nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (!thisNodeFile.getPath().getName().endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - PrintStream out = System.out; - try { - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - if (getContainerLogsStream(containerId, reader) == null) { - continue; - } - // We have to re-create reader object to reset the stream index - // after calling getContainerLogsStream which would move the stream - // index to the end of the log file. - reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - out = createPrintStream(localDir, thisNodeFile.getPath().getName(), - containerId); - String containerString = String.format(CONTAINER_ON_NODE_PATTERN, - containerId, thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - if (logType == null || logType.isEmpty()) { - if (dumpAContainerLogs(containerId, reader, out, - thisNodeFile.getModificationTime(), options.getBytes()) > -1) { - foundContainerLogs = true; - } - } else { - if (dumpAContainerLogsForALogType(containerId, reader, out, - thisNodeFile.getModificationTime(), logType, - options.getBytes()) > -1) { - foundContainerLogs = true; - } - } - } finally { - if (reader != null) { - reader.close(); - } - closePrintStream(out); - } - } - } - if (!foundContainerLogs) { - containerLogNotFound(containerId); - return -1; - } - return 0; - } - - @Private - public int dumpAContainerLogs(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, long bytes) throws IOException { - DataInputStream valueStream = getContainerLogsStream( - containerIdStr, reader); - - if (valueStream == null) { - return -1; - } - - boolean foundContainerLogs = false; - while (true) { - try { - LogReader.readAContainerLogsForALogType(valueStream, out, - logUploadedTime, bytes); - foundContainerLogs = true; - } catch (EOFException eof) { - break; - } - } - if (foundContainerLogs) { - return 0; - } - return -1; - } - - private DataInputStream getContainerLogsStream(String containerIdStr, - AggregatedLogFormat.LogReader reader) throws IOException { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null && !key.toString().equals(containerIdStr)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - return valueStream; - } - - @Private - public int dumpAContainerLogsForALogType(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out, - long logUploadedTime, List logType, long bytes) - throws IOException { - DataInputStream valueStream = getContainerLogsStream( - containerIdStr, reader); - if (valueStream == null) { - return -1; - } - - boolean foundContainerLogs = false; - while (true) { - try { - int result = LogReader.readContainerLogsForALogType( - valueStream, out, logUploadedTime, logType, bytes); - if (result == 0) { - foundContainerLogs = true; - } - } catch (EOFException eof) { - break; + try { + boolean foundAnyLogs = getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogs( + options, null); + if (!foundAnyLogs) { + containerLogNotFound(options.getContainerId()); + return -1; } - } - - if (foundContainerLogs) { return 0; + } catch (IOException ex) { + System.err.println(ex.getMessage()); + return -1; } - return -1; } @Private public int dumpAllContainersLogs(ContainerLogsRequest options) throws IOException { - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); - String localDir = options.getOutputLocalDir(); - List logTypes = new ArrayList(options.getLogTypes()); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { - return -1; - } - boolean foundAnyLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (thisNodeFile.getPath().getName().equals(appId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - - while (valueStream != null) { - PrintStream out = createPrintStream(localDir, - thisNodeFile.getPath().getName(), key.toString()); - try { - String containerString = String.format( - CONTAINER_ON_NODE_PATTERN, key, - thisNodeFile.getPath().getName()); - out.println(containerString); - out.println("LogAggregationType: AGGREGATED"); - out.println(StringUtils.repeat("=", containerString.length())); - while (true) { - try { - if (logTypes == null || logTypes.isEmpty()) { - LogReader.readAContainerLogsForALogType(valueStream, out, - thisNodeFile.getModificationTime(), - options.getBytes()); - foundAnyLogs = true; - } else { - int result = LogReader.readContainerLogsForALogType( - valueStream, out, thisNodeFile.getModificationTime(), - logTypes, options.getBytes()); - if (result == 0) { - foundAnyLogs = true; - } - } - } catch (EOFException eof) { - break; - } - } - } finally { - closePrintStream(out); - } - - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } + try { + boolean foundAnyLogs = getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogs( + options, null); + if (!foundAnyLogs) { + emptyLogDir(LogAggregationUtils.getRemoteAppLogDir( + conf, options.getAppId(), options.getAppOwner()) + .toString()); + return -1; } - } - if (!foundAnyLogs) { - emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner) - .toString()); + return 0; + } catch (IOException ex) { + System.err.println(ex.getMessage()); return -1; } - return 0; } @Private public int printAContainerLogMetadata(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); String nodeId = options.getNodeId(); String containerIdStr = options.getContainerId(); List containersLogMeta; try { - containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS( - conf, appId, containerIdStr, nodeId, appOwner); + containersLogMeta = getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); } catch (Exception ex) { err.println(ex.getMessage()); return -1; @@ -473,8 +233,26 @@ public void printNodesList(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { ApplicationId appId = options.getAppId(); String appOwner = options.getAppOwner(); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); + LogAggregationFileFormat fileFormat = null; + try { + fileFormat = getFileFormat(appId, appOwner); + } catch (Exception ex) { + err.println(ex.getMessage()); + return; + } + RemoteIterator nodeFiles = null; + try { + nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId, + appOwner, fileFormat.getRemoteRootLogDir(), + fileFormat.getRemoteRootLogDirSuffix()); + } catch (FileNotFoundException fnf) { + logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( + conf, appId, appOwner).toString()); + } catch (AccessControlException | AccessDeniedException ace) { + logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( + conf, appId, appOwner).toString(), appOwner, + ace.getMessage()); + } if (nodeFiles == null) { return; } @@ -497,44 +275,21 @@ public void printNodesList(ContainerLogsRequest options, public void printContainersList(ContainerLogsRequest options, PrintStream out, PrintStream err) throws IOException { ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); String nodeId = options.getNodeId(); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { - return; - } boolean foundAnyLogs = false; - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - out.println(String.format(CONTAINER_ON_NODE_PATTERN, key, - thisNodeFile.getPath().getName())); - foundAnyLogs = true; - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } - } + List containersLogMeta = new ArrayList<>(); + try { + containersLogMeta = getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); + } catch (Exception ex) { + err.println(ex.getMessage()); + } + for(ContainerLogMeta logMeta : containersLogMeta) { + out.println(String.format(CONTAINER_ON_NODE_PATTERN, + logMeta.getContainerId(), + logMeta.getNodeId())); + foundAnyLogs = true; } if (!foundAnyLogs) { if (nodeId != null) { @@ -547,26 +302,6 @@ public void printContainersList(ContainerLogsRequest options, } } - private RemoteIterator getRemoteNodeFileDir(ApplicationId appId, - String appOwner) throws IOException { - RemoteIterator nodeFiles = null; - try { - nodeFiles = LogAggregationUtils.getRemoteNodeFileDir( - conf, appId, appOwner); - } catch (FileNotFoundException fnf) { - logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString()); - } catch (AccessControlException | AccessDeniedException ace) { - logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString(), appOwner, - ace.getMessage()); - } catch (IOException ioe) { - logDirIOError(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString(), ioe.getMessage()); - } - return nodeFiles; - } - @Override public void setConf(Configuration conf) { this.conf = conf; @@ -600,11 +335,6 @@ private static void logDirNoAccessPermission(String remoteAppLogDir, + ". Error message found: " + errorMessage); } - private static void logDirIOError(String remoteAppLogDir, String errMsg) { - System.err.println("Cannot access to " + remoteAppLogDir + - ". Error message found: " + errMsg); - } - @Private public PrintStream createPrintStream(String localDir, String nodeId, String containerId) throws IOException { @@ -628,59 +358,29 @@ public void closePrintStream(PrintStream out) { @Private public Set listContainerLogs(ContainerLogsRequest options) throws IOException { + List containersLogMeta; Set logTypes = new HashSet(); - ApplicationId appId = options.getAppId(); - String appOwner = options.getAppOwner(); - String nodeId = options.getNodeId(); - String containerIdStr = options.getContainerId(); - boolean getAllContainers = (containerIdStr == null); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = getRemoteNodeFileDir( - appId, appOwner); - if (nodeFiles == null) { + try { + containersLogMeta = getFileFormat(options.getAppId(), + options.getAppOwner()).readAggregatedLogsMeta( + options); + } catch (Exception ex) { + System.err.println(ex.getMessage()); return logTypes; } - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(getConf(), - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - if (getAllContainers || (key.toString().equals(containerIdStr))) { - while (true) { - try { - String logFile = LogReader.readContainerMetaDataAndSkipData( - valueStream).getFirst(); - logTypes.add(logFile); - } catch (EOFException eof) { - break; - } - } - if (!getAllContainers) { - break; - } - } - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } + for (ContainerLogMeta logMeta: containersLogMeta) { + for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) { + logTypes.add(fileInfo.getFileName()); } } return logTypes; } + + private LogAggregationFileFormat getFileFormat(ApplicationId appId, + String appOwner) throws IOException { + if (factory == null) { + factory = new LogAggregationFileFormatFactory(conf); + } + return factory.getFileFormatForRead(appId, appOwner); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index ae2517a..4a5dfce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -17,25 +17,11 @@ */ package org.apache.hadoop.yarn.logaggregation; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.commons.math3.util.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; -import org.apache.hadoop.yarn.util.Times; /** * This class contains several utility function which could be used in different @@ -50,81 +36,6 @@ private LogToolUtils() {} "Container: %s on %s"; /** - * Return a list of {@link ContainerLogMeta} for a container - * from Remote FileSystem. - * - * @param conf the configuration - * @param appId the applicationId - * @param containerIdStr the containerId - * @param nodeId the nodeId - * @param appOwner the application owner - * @return a list of {@link ContainerLogMeta} - * @throws IOException if there is no available log file - */ - public static List getContainerLogMetaFromRemoteFS( - Configuration conf, ApplicationId appId, String containerIdStr, - String nodeId, String appOwner) throws IOException { - List containersLogMeta = new ArrayList<>(); - boolean getAllContainers = (containerIdStr == null); - String nodeIdStr = (nodeId == null) ? null - : LogAggregationUtils.getNodeString(nodeId); - RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, appOwner); - if (nodeFiles == null) { - throw new IOException("There is no available log fils for " - + "application:" + appId); - } - while (nodeFiles.hasNext()) { - FileStatus thisNodeFile = nodeFiles.next(); - if (nodeIdStr != null) { - if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { - continue; - } - } - if (!thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = - new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - try { - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null) { - if (getAllContainers || (key.toString().equals(containerIdStr))) { - ContainerLogMeta containerLogMeta = new ContainerLogMeta( - key.toString(), thisNodeFile.getPath().getName()); - while (true) { - try { - Pair logMeta = - LogReader.readContainerMetaDataAndSkipData( - valueStream); - containerLogMeta.addLogMeta( - logMeta.getFirst(), - logMeta.getSecond(), - Times.format(thisNodeFile.getModificationTime())); - } catch (EOFException eof) { - break; - } - } - containersLogMeta.add(containerLogMeta); - if (!getAllContainers) { - break; - } - } - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - } finally { - reader.close(); - } - } - } - return containersLogMeta; - } - - /** * Output container log. * @param containerId the containerId * @param nodeId the nodeId @@ -195,82 +106,4 @@ public static void outputContainerLog(String containerId, String nodeId, os.flush(); } - public static boolean outputAggregatedContainerLog(Configuration conf, - ApplicationId appId, String appOwner, - String containerId, String nodeId, - String logFileName, long outputSize, OutputStream os, - byte[] buf) throws IOException { - boolean findLogs = false; - RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, appOwner); - while (nodeFiles != null && nodeFiles.hasNext()) { - final FileStatus thisNodeFile = nodeFiles.next(); - String nodeName = thisNodeFile.getPath().getName(); - if (nodeName.equals(appId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if ((nodeId == null || nodeName.contains(LogAggregationUtils - .getNodeString(nodeId))) && !nodeName.endsWith( - LogAggregationUtils.TMP_FILE_SUFFIX)) { - AggregatedLogFormat.LogReader reader = null; - try { - reader = new AggregatedLogFormat.LogReader(conf, - thisNodeFile.getPath()); - DataInputStream valueStream; - LogKey key = new LogKey(); - valueStream = reader.next(key); - while (valueStream != null && !key.toString() - .equals(containerId)) { - // Next container - key = new LogKey(); - valueStream = reader.next(key); - } - if (valueStream == null) { - continue; - } - while (true) { - try { - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - if (fileType.equalsIgnoreCase(logFileName)) { - LogToolUtils.outputContainerLog(containerId, - nodeId, fileType, fileLength, outputSize, - Times.format(thisNodeFile.getModificationTime()), - valueStream, os, buf, - ContainerLogAggregationType.AGGREGATED); - StringBuilder sb = new StringBuilder(); - String endOfFile = "End of LogType:" + fileType; - sb.append("\n" + endOfFile + "\n"); - sb.append(StringUtils.repeat("*", endOfFile.length() + 50) - + "\n\n"); - byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); - os.write(b, 0, b.length); - findLogs = true; - } else { - long totalSkipped = 0; - long currSkipped = 0; - while (currSkipped != -1 && totalSkipped < fileLength) { - currSkipped = valueStream.skip( - fileLength - totalSkipped); - totalSkipped += currSkipped; - } - } - } catch (EOFException eof) { - break; - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - } - os.flush(); - return findLogs; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationFileFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationFileFormat.java index e49387c..36f2439 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationFileFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationFileFormat.java @@ -24,6 +24,10 @@ import com.google.common.collect.Sets; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,6 +54,10 @@ import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; /** * Base class to implement Log Aggregation File Format. @@ -171,6 +180,55 @@ public abstract void postWrite(final LogAggregationFileFormatContext record) throws Exception; /** + * Output container log. + * @param logRequest {@link ContainerLogsRequest} + * @param os the output stream + * @throws IOException if we can not access the log file. + */ + public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException; + + /** + * Return a list of {@link ContainerLogMeta} for an application + * from Remote FileSystem. + * + * @param logRequest {@link ContainerLogsRequest} + * @return a list of {@link ContainerLogMeta} + * @throws IOException if there is no available log file + */ + public abstract List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException; + + /** + * Render Aggregated Logs block + * @param html the html + * @param context the ViewContext + */ + public abstract void renderAggregatedLogsBlock(Block html, + ViewContext context); + + /** + * Returns the owner of the application. + * + * @param the aggregatedLog path. + * @return the application owner. + * @throws IOException + */ + public abstract String getApplicationOwner(Path aggregatedLogPath) + throws IOException; + + /** + * Returns ACLs for the application. An empty map is returned if no ACLs are + * found. + * + * @param the aggregatedLog path. + * @return a map of the Application ACLs. + * @throws IOException + */ + public abstract Map getApplicationAcls( + Path aggregatedLogPath) throws IOException; + + /** * Get the remote log file path to write. * @param context LogAggregationFileFormatContext * @return the remote log file path to write @@ -180,6 +238,25 @@ public Path getRemoteLogFileToWrite( return context.getRemoteNodeTmpLogFileForApp(); } + protected PrintStream createPrintStream(String localDir, String nodeId, + String containerId) throws IOException { + PrintStream out = System.out; + if(localDir != null && !localDir.isEmpty()) { + Path nodePath = new Path(localDir, LogAggregationUtils + .getNodeString(nodeId)); + Files.createDirectories(Paths.get(nodePath.toString())); + Path containerLogPath = new Path(nodePath, containerId); + out = new PrintStream(containerLogPath.toString(), "UTF-8"); + } + return out; + } + + protected void closePrintStream(OutputStream out) { + if (out != System.out) { + IOUtils.closeQuietly(out); + } + } + /** * Verify and create the remote log directory. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationTFileFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationTFileFormat.java index 5f2d5d6..d4ac711 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationTFileFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/fileformat/LogAggregationTFileFormat.java @@ -18,22 +18,57 @@ package org.apache.hadoop.yarn.logaggregation.fileformat; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; + +import com.google.inject.Inject; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.util.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HarFs; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; +import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; /** * The TFile log aggregation file format implementation. @@ -44,6 +79,7 @@ LogAggregationTFileFormat.class); private LogWriter writer; + private TFileLogReader reader = null; public LogAggregationTFileFormat(){} @@ -122,4 +158,438 @@ public Object run() throws Exception { + Times.format(record.getLogUploadTimeStamp()) + "\n"); } } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + boolean findLogs = false; + boolean createPrintStream = (os == null); + ApplicationId appId = logRequest.getAppId(); + String nodeId = logRequest.getNodeId(); + List logTypes = new ArrayList<>(); + if (logRequest.getLogTypes() != null && !logRequest + .getLogTypes().isEmpty()) { + logTypes.addAll(logRequest.getLogTypes()); + } + String containerIdStr = logRequest.getContainerId(); + boolean getAllContainers = (containerIdStr == null + || containerIdStr.isEmpty()); + long size = logRequest.getBytes(); + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner()); + byte[] buf = new byte[65535]; + while (nodeFiles != null && nodeFiles.hasNext()) { + final FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if ((nodeId == null || nodeName.contains(LogAggregationUtils + .getNodeString(nodeId))) && !nodeName.endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = null; + try { + reader = new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (getAllContainers || (key.toString().equals(containerIdStr))) { + if (createPrintStream) { + os = createPrintStream( + logRequest.getOutputLocalDir(), + thisNodeFile.getPath().getName(), key.toString()); + } + try { + while (true) { + try { + String fileType = valueStream.readUTF(); + String fileLengthStr = valueStream.readUTF(); + long fileLength = Long.parseLong(fileLengthStr); + if (logTypes == null || logTypes.isEmpty() || + logTypes.contains(fileType)) { + LogToolUtils.outputContainerLog(key.toString(), + nodeName, fileType, fileLength, size, + Times.format(thisNodeFile.getModificationTime()), + valueStream, os, buf, + ContainerLogAggregationType.AGGREGATED); + StringBuilder sb = new StringBuilder(); + String endOfFile = "End of LogType:" + fileType; + sb.append("\n" + endOfFile + "\n"); + sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + + "\n\n"); + byte[] b = sb.toString().getBytes(Charset.forName("UTF-8")); + os.write(b, 0, b.length); + findLogs = true; + } else { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip( + fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + } catch (EOFException eof) { + break; + } + } + } finally { + os.flush(); + if (createPrintStream) { + closePrintStream(os); + } + } + if (!getAllContainers) { + break; + } + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + if (reader != null) { + reader.close(); + } + } + } + } + return findLogs; + } + + @Override + public List readAggregatedLogsMeta(ContainerLogsRequest logRequest) throws IOException { + List containersLogMeta = new ArrayList<>(); + String containerIdStr = logRequest.getContainerId(); + String nodeId = logRequest.getNodeId(); + ApplicationId appId = logRequest.getAppId(); + String appOwner = logRequest.getAppOwner(); + boolean getAllContainers = (containerIdStr == null); + String nodeIdStr = (nodeId == null) ? null + : LogAggregationUtils.getNodeString(nodeId); + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner); + if (nodeFiles == null) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + while (nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + if (nodeIdStr != null) { + if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) { + continue; + } + } + if (!thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + AggregatedLogFormat.LogReader reader = + new AggregatedLogFormat.LogReader(conf, + thisNodeFile.getPath()); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (getAllContainers || (key.toString().equals(containerIdStr))) { + ContainerLogMeta containerLogMeta = new ContainerLogMeta( + key.toString(), thisNodeFile.getPath().getName()); + while (true) { + try { + Pair logMeta = + LogReader.readContainerMetaDataAndSkipData( + valueStream); + containerLogMeta.addLogMeta( + logMeta.getFirst(), + logMeta.getSecond(), + Times.format(thisNodeFile.getModificationTime())); + } catch (EOFException eof) { + break; + } + } + containersLogMeta.add(containerLogMeta); + if (!getAllContainers) { + break; + } + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + } + } + return containersLogMeta; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(context); + block.render(html); + } + + @Override + public String getApplicationOwner(Path aggregatedLog) throws IOException { + createTFileLogReader(aggregatedLog); + return this.reader.getLogReader().getApplicationOwner(); + } + + @Override + public Map getApplicationAcls( + Path aggregatedLog) throws IOException { + createTFileLogReader(aggregatedLog); + return this.reader.getLogReader().getApplicationAcls(); + } + + private void createTFileLogReader(Path aggregatedLog) throws IOException { + if (this.reader == null || !this.reader.getAggregatedLogPath() + .equals(aggregatedLog)) { + LogReader logReader = new LogReader(conf, aggregatedLog); + this.reader = new TFileLogReader(logReader, aggregatedLog); + } + } + + private static class TFileLogReader { + private LogReader logReader; + private Path aggregatedLogPath; + public TFileLogReader(LogReader logReader, Path aggregatedLogPath) { + this.setLogReader(logReader); + this.setAggregatedLogPath(aggregatedLogPath); + } + public LogReader getLogReader() { + return logReader; + } + public void setLogReader(LogReader logReader) { + this.logReader = logReader; + } + public Path getAggregatedLogPath() { + return aggregatedLogPath; + } + public void setAggregatedLogPath(Path aggregatedLogPath) { + this.aggregatedLogPath = aggregatedLogPath; + } + } + + private class TFileAggregatedLogsBlock extends HtmlBlock { + + @Inject + public TFileAggregatedLogsBlock(ViewContext ctx) { + super(ctx); + } + + @Override + protected void render(Block html) { + ContainerId containerId = LogAggregationWebUtils + .verifyAndGetContainerId(html, $(CONTAINER_ID)); + NodeId nodeId = LogAggregationWebUtils + .verifyAndGetNodeId(html, $(NM_NODENAME)); + String appOwner = LogAggregationWebUtils + .verifyAndGetAppOwner(html, $(APP_OWNER)); + + boolean isValid = true; + long start = -4096; + try { + start = LogAggregationWebUtils.verifyAndGetLogStartIndex( + html, $("start")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("start")).__(); + isValid = false; + } + long end = Long.MAX_VALUE; + try { + end = LogAggregationWebUtils.verifyAndGetLogEndIndex( + html, $("end")); + }catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("end")).__(); + isValid = false; + } + + if (containerId == null || nodeId == null || appOwner == null + || appOwner.isEmpty() || !isValid) { + return; + } + + ApplicationId appId = containerId.getApplicationAttemptId() + .getApplicationId(); + String logEntity = $(ENTITY_STRING); + if (logEntity == null || logEntity.isEmpty()) { + logEntity = containerId.toString(); + } + + RemoteIterator nodeFiles; + try { + Path remoteAppDir = getRemoteAppLogDir(appId, appOwner); + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified( + remoteAppDir); + nodeFiles = + FileContext.getFileContext(qualifiedLogDir.toUri(), conf) + .listStatus(remoteAppDir); + } catch (Exception ex) { + return; + } + + boolean foundLog = false; + String desiredLogType = $(CONTAINER_LOG_TYPE); + try { + while (nodeFiles.hasNext()) { + AggregatedLogFormat.LogReader reader = null; + try { + FileStatus thisNodeFile = nodeFiles.next(); + if (thisNodeFile.getPath().getName().equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + if (!thisNodeFile.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + || thisNodeFile.getPath().getName() + .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { + continue; + } + long logUploadedTime = thisNodeFile.getModificationTime(); + reader = new AggregatedLogFormat.LogReader( + conf, thisNodeFile.getPath()); + + Map appAcls = null; + try { + appAcls = reader.getApplicationAcls(); + } catch (IOException e) { + LOG.error("Error getting logs for " + logEntity, e); + continue; + } + ApplicationACLsManager aclsManager = new ApplicationACLsManager( + conf); + aclsManager.addApplication(appId, appAcls); + + String remoteUser = request().getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + if (callerUGI != null && !aclsManager.checkAccess(callerUGI, + ApplicationAccessType.VIEW_APP, remoteUser, appId)) { + html.h1().__("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity + + " in log file [" + thisNodeFile.getPath().getName() + "]") + .__(); + LOG.error("User [" + remoteUser + + "] is not authorized to view the logs for " + logEntity); + continue; + } + + AggregatedLogFormat.ContainerLogsReader logReader = reader + .getContainerLogsReader(containerId); + if (logReader == null) { + continue; + } + + foundLog = readContainerLogs(html, logReader, start, end, + desiredLogType, logUploadedTime); + } catch (IOException ex) { + LOG.error("Error getting logs for " + logEntity, ex); + continue; + } finally { + if (reader != null) + reader.close(); + } + } + if (!foundLog) { + if (desiredLogType.isEmpty()) { + html.h1("No logs available for container " + containerId.toString()); + } else { + html.h1("Unable to locate '" + desiredLogType + + "' log for container " + containerId.toString()); + } + } + } catch (IOException e) { + html.h1().__("Error getting logs for " + logEntity).__(); + LOG.error("Error getting logs for " + logEntity, e); + } + } + + private boolean readContainerLogs(Block html, + AggregatedLogFormat.ContainerLogsReader logReader, long startIndex, + long endIndex, String desiredLogType, long logUpLoadTime) + throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + + boolean foundLog = false; + String logType = logReader.nextLog(); + while (logType != null) { + if (desiredLogType == null || desiredLogType.isEmpty() + || desiredLogType.equals(logType)) { + long logLength = logReader.getCurrentLogLength(); + if (foundLog) { + html.pre().__("\n\n").__(); + } + + html.p().__("Log Type: " + logType).__(); + html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__(); + html.p().__("Log Length: " + Long.toString(logLength)).__(); + + long start = startIndex < 0 + ? logLength + startIndex : startIndex; + start = start < 0 ? 0 : start; + start = start > logLength ? logLength : start; + long end = endIndex < 0 + ? logLength + endIndex : endIndex; + end = end < 0 ? 0 : end; + end = end > logLength ? logLength : end; + end = end < start ? start : end; + + long toRead = end - start; + if (toRead < logLength) { + html.p().__("Showing " + toRead + " bytes of " + logLength + + " total. Click ") + .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), + $(ENTITY_STRING), $(APP_OWNER), + logType, "?start=0"), "here"). + __(" for the full log.").__(); + } + + long totalSkipped = 0; + while (totalSkipped < start) { + long ret = logReader.skip(start - totalSkipped); + if (ret == 0) { + //Read one byte + int nextByte = logReader.read(); + // Check if we have reached EOF + if (nextByte == -1) { + throw new IOException( "Premature EOF from container log"); + } + ret = 1; + } + totalSkipped += ret; + } + + int len = 0; + int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + PRE pre = html.pre(); + + while (toRead > 0 + && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { + pre.__(new String(cbuf, 0, len)); + toRead = toRead - len; + currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; + } + + pre.__(); + foundLog = true; + } + + logType = logReader.nextLog(); + } + + return foundLog; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 0c7e09e..d664dbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -20,33 +20,19 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER; import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID; -import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE; import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING; import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME; import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Map; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.HarFs; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; -import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.util.Times; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet.PRE; +import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormat; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormatFactory; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import com.google.inject.Inject; @@ -55,20 +41,40 @@ public class AggregatedLogsBlock extends HtmlBlock { private final Configuration conf; + private final LogAggregationFileFormatFactory factory; @Inject AggregatedLogsBlock(Configuration conf) { this.conf = conf; + factory = new LogAggregationFileFormatFactory(conf); } @Override protected void render(Block html) { - ContainerId containerId = verifyAndGetContainerId(html); - NodeId nodeId = verifyAndGetNodeId(html); - String appOwner = verifyAndGetAppOwner(html); - LogLimits logLimits = verifyAndGetLogLimits(html); + ContainerId containerId = LogAggregationWebUtils + .verifyAndGetContainerId(html, $(CONTAINER_ID)); + NodeId nodeId = LogAggregationWebUtils + .verifyAndGetNodeId(html, $(NM_NODENAME)); + String appOwner = LogAggregationWebUtils + .verifyAndGetAppOwner(html, $(APP_OWNER)); + boolean isValid = true; + try { + LogAggregationWebUtils.verifyAndGetLogStartIndex( + html, $("start")); + } catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("start")).__(); + isValid = false; + } + try { + LogAggregationWebUtils.verifyAndGetLogEndIndex( + html, $("end")); + }catch (NumberFormatException ne) { + html.h1().__("Invalid log start value: " + $("end")).__(); + isValid = false; + } + if (containerId == null || nodeId == null || appOwner == null - || appOwner.isEmpty() || logLimits == null) { + || appOwner.isEmpty() || !isValid) { return; } @@ -93,20 +99,10 @@ protected void render(Block html) { return; } - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogDir, applicationId, appOwner, - LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)); - RemoteIterator nodeFiles; + LogAggregationFileFormat fileFormat; try { - Path qualifiedLogDir = - FileContext.getFileContext(conf).makeQualified( - remoteAppDir); - nodeFiles = - FileContext.getFileContext(qualifiedLogDir.toUri(), conf) - .listStatus(remoteAppDir); + fileFormat = this.factory.getFileFormatForRead( + applicationId, appOwner); } catch (FileNotFoundException fnf) { html.h1() .__("Logs not available for " + logEntity @@ -124,245 +120,7 @@ protected void render(Block html) { return; } - boolean foundLog = false; - String desiredLogType = $(CONTAINER_LOG_TYPE); - try { - while (nodeFiles.hasNext()) { - AggregatedLogFormat.LogReader reader = null; - try { - FileStatus thisNodeFile = nodeFiles.next(); - if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) { - Path p = new Path("har:///" - + thisNodeFile.getPath().toUri().getRawPath()); - nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); - continue; - } - if (!thisNodeFile.getPath().getName() - .contains(LogAggregationUtils.getNodeString(nodeId)) - || thisNodeFile.getPath().getName() - .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { - continue; - } - long logUploadedTime = thisNodeFile.getModificationTime(); - reader = - new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); - - String owner = null; - Map appAcls = null; - try { - owner = reader.getApplicationOwner(); - appAcls = reader.getApplicationAcls(); - } catch (IOException e) { - LOG.error("Error getting logs for " + logEntity, e); - continue; - } - ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf); - aclsManager.addApplication(applicationId, appAcls); - - String remoteUser = request().getRemoteUser(); - UserGroupInformation callerUGI = null; - if (remoteUser != null) { - callerUGI = UserGroupInformation.createRemoteUser(remoteUser); - } - if (callerUGI != null && !aclsManager.checkAccess(callerUGI, - ApplicationAccessType.VIEW_APP, owner, applicationId)) { - html.h1() - .__("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity - + " in log file [" + thisNodeFile.getPath().getName() + "]").__(); - LOG.error("User [" + remoteUser - + "] is not authorized to view the logs for " + logEntity); - continue; - } - - AggregatedLogFormat.ContainerLogsReader logReader = reader - .getContainerLogsReader(containerId); - if (logReader == null) { - continue; - } - - foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType, logUploadedTime); - } catch (IOException ex) { - LOG.error("Error getting logs for " + logEntity, ex); - continue; - } finally { - if (reader != null) { - reader.close(); - } - } - } - if (!foundLog) { - if (desiredLogType.isEmpty()) { - html.h1("No logs available for container " + containerId.toString()); - } else { - html.h1("Unable to locate '" + desiredLogType - + "' log for container " + containerId.toString()); - } - } - } catch (IOException e) { - html.h1().__("Error getting logs for " + logEntity).__(); - LOG.error("Error getting logs for " + logEntity, e); - } - } - - private boolean readContainerLogs(Block html, - AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, - String desiredLogType, long logUpLoadTime) throws IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - - boolean foundLog = false; - String logType = logReader.nextLog(); - while (logType != null) { - if (desiredLogType == null || desiredLogType.isEmpty() - || desiredLogType.equals(logType)) { - long logLength = logReader.getCurrentLogLength(); - if (foundLog) { - html.pre().__("\n\n").__(); - } - - html.p().__("Log Type: " + logType).__(); - html.p().__("Log Upload Time: " + Times.format(logUpLoadTime)).__(); - html.p().__("Log Length: " + Long.toString(logLength)).__(); - - long start = logLimits.start < 0 - ? logLength + logLimits.start : logLimits.start; - start = start < 0 ? 0 : start; - start = start > logLength ? logLength : start; - long end = logLimits.end < 0 - ? logLength + logLimits.end : logLimits.end; - end = end < 0 ? 0 : end; - end = end > logLength ? logLength : end; - end = end < start ? start : end; - - long toRead = end - start; - if (toRead < logLength) { - html.p().__("Showing " + toRead + " bytes of " + logLength - + " total. Click ") - .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID), - $(ENTITY_STRING), $(APP_OWNER), - logType, "?start=0"), "here"). - __(" for the full log.").__(); - } - - long totalSkipped = 0; - while (totalSkipped < start) { - long ret = logReader.skip(start - totalSkipped); - if (ret == 0) { - //Read one byte - int nextByte = logReader.read(); - // Check if we have reached EOF - if (nextByte == -1) { - throw new IOException( "Premature EOF from container log"); - } - ret = 1; - } - totalSkipped += ret; - } - - int len = 0; - int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - PRE pre = html.pre(); - - while (toRead > 0 - && (len = logReader.read(cbuf, 0, currentToRead)) > 0) { - pre.__(new String(cbuf, 0, len)); - toRead = toRead - len; - currentToRead = toRead > bufferSize ? bufferSize : (int) toRead; - } - - pre.__(); - foundLog = true; - } - - logType = logReader.nextLog(); - } - - return foundLog; - } - - private ContainerId verifyAndGetContainerId(Block html) { - String containerIdStr = $(CONTAINER_ID); - if (containerIdStr == null || containerIdStr.isEmpty()) { - html.h1().__("Cannot get container logs without a ContainerId").__(); - return null; - } - ContainerId containerId = null; - try { - containerId = ContainerId.fromString(containerIdStr); - } catch (IllegalArgumentException e) { - html.h1() - .__("Cannot get container logs for invalid containerId: " - + containerIdStr).__(); - return null; - } - return containerId; - } - - private NodeId verifyAndGetNodeId(Block html) { - String nodeIdStr = $(NM_NODENAME); - if (nodeIdStr == null || nodeIdStr.isEmpty()) { - html.h1().__("Cannot get container logs without a NodeId").__(); - return null; - } - NodeId nodeId = null; - try { - nodeId = NodeId.fromString(nodeIdStr); - } catch (IllegalArgumentException e) { - html.h1().__("Cannot get container logs. Invalid nodeId: " + nodeIdStr) - .__(); - return null; - } - return nodeId; - } - - private String verifyAndGetAppOwner(Block html) { - String appOwner = $(APP_OWNER); - if (appOwner == null || appOwner.isEmpty()) { - html.h1().__("Cannot get container logs without an app owner").__(); - } - return appOwner; - } - - private static class LogLimits { - long start; - long end; - } - - private LogLimits verifyAndGetLogLimits(Block html) { - long start = -4096; - long end = Long.MAX_VALUE; - boolean isValid = true; - - String startStr = $("start"); - if (startStr != null && !startStr.isEmpty()) { - try { - start = Long.parseLong(startStr); - } catch (NumberFormatException e) { - isValid = false; - html.h1().__("Invalid log start value: " + startStr).__(); - } - } - - String endStr = $("end"); - if (endStr != null && !endStr.isEmpty()) { - try { - end = Long.parseLong(endStr); - } catch (NumberFormatException e) { - isValid = false; - html.h1().__("Invalid log end value: " + endStr).__(); - } - } - - if (!isValid) { - return null; - } - - LogLimits limits = new LogLimits(); - limits.start = start; - limits.end = end; - return limits; + fileFormat.renderAggregatedLogsBlock(html, this.context()); } private String getApplicationLogURL(ApplicationId applicationId) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/fileformat/TestLogAggregationFileFormatFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/fileformat/TestLogAggregationFileFormatFactory.java index ba48ada..d6c1082 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/fileformat/TestLogAggregationFileFormatFactory.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/fileformat/TestLogAggregationFileFormatFactory.java @@ -23,8 +23,10 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.OutputStream; import java.io.Writer; import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +37,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.webapp.View.ViewContext; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.junit.Test; /** @@ -177,5 +183,34 @@ public void postWrite(LogAggregationFileFormatContext record) throws Exception { // Do Nothing } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + return false; + } + + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + return null; + } + + @Override + public String getApplicationOwner(Path aggregatedLogPath) + throws IOException { + return null; + } + + @Override + public Map getApplicationAcls( + Path aggregatedLogPath) throws IOException { + return null; + } + + @Override + public void renderAggregatedLogsBlock(Block html, ViewContext context) { + //DO NOTHING + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 6195199..18a3e7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -56,8 +57,9 @@ import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormatFactory; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; @@ -91,12 +93,14 @@ private static final Joiner JOINER = Joiner.on(""); private static final Joiner DOT_JOINER = Joiner.on(". "); private final Configuration conf; + private final LogAggregationFileFormatFactory factory; @Inject public AHSWebServices(ApplicationBaseProtocol appBaseProt, Configuration conf) { super(appBaseProt); this.conf = conf; + this.factory = new LogAggregationFileFormatFactory(conf); } @GET @@ -524,9 +528,17 @@ private StreamingOutput getStreamingOutput(final ApplicationId appId, @Override public void write(OutputStream os) throws IOException, WebApplicationException { - byte[] buf = new byte[65535]; - boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf, - appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setBytes(bytes); + request.setNodeId(nodeId); + Set logTypes = new HashSet<>(); + logTypes.add(logFile); + request.setLogTypes(logTypes); + boolean findLogs = factory.getFileFormatForRead(appId, appOwner) + .readAggregatedLogs(request, os); if (!findLogs) { os.write(("Can not find logs for container:" + containerIdStr).getBytes(Charset.forName("UTF-8"))); @@ -557,9 +569,13 @@ private Response getContainerLogMeta(ApplicationId appId, String appOwner, final String nodeId, final String containerIdStr, boolean emptyLocalContainerLogMeta) { try { - List containerLogMeta = LogToolUtils - .getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr, - nodeId, appOwner); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setNodeId(nodeId); + List containerLogMeta = factory.getFileFormatForRead( + appId, appOwner).readAggregatedLogsMeta(request); if (containerLogMeta.isEmpty()) { throw new NotFoundException( "Can not get log meta for container: " + containerIdStr); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index 04a889f..9c644b9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -23,9 +23,10 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map.Entry; - +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.GET; @@ -55,8 +56,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.LogToolUtils; +import org.apache.hadoop.yarn.logaggregation.fileformat.LogAggregationFileFormatFactory; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -89,6 +92,7 @@ private static RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); private final String redirectWSUrl; + private final LogAggregationFileFormatFactory factory; private @javax.ws.rs.core.Context HttpServletRequest request; @@ -107,6 +111,8 @@ public NMWebServices(final Context nm, final ResourceView view, this.webapp = webapp; this.redirectWSUrl = this.nmContext.getConf().get( YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL); + this.factory = new LogAggregationFileFormatFactory( + this.nmContext.getConf()); } private void init() { @@ -261,10 +267,13 @@ public Response getContainerLogsInfo( Application app = this.nmContext.getApplications().get(appId); String appOwner = app == null ? null : app.getUser(); try { - List containerLogMeta = LogToolUtils - .getContainerLogMetaFromRemoteFS(this.nmContext.getConf(), - appId, containerIdStr, - this.nmContext.getNodeId().toString(), appOwner); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerIdStr); + request.setNodeId(this.nmContext.getNodeId().toString()); + List containerLogMeta = factory.getFileFormatForRead( + appId, appOwner).readAggregatedLogsMeta(request); if (!containerLogMeta.isEmpty()) { for (ContainerLogMeta logMeta : containerLogMeta) { containersLogsInfo.add(new ContainerLogsInfo(logMeta, @@ -450,10 +459,17 @@ public void write(OutputStream os) throws IOException, Application app = nmContext.getApplications().get(appId); String appOwner = app == null ? null : app.getUser(); try { - LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(), - appId, appOwner, containerId.toString(), - nmContext.getNodeId().toString(), outputFileName, bytes, - os, buf); + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(appId); + request.setAppOwner(appOwner); + request.setContainerId(containerId.toString()); + request.setNodeId(nmContext.getNodeId().toString()); + request.setBytes(bytes); + Set logTypes = new HashSet<>(); + logTypes.add(outputFileName); + request.setLogTypes(logTypes); + factory.getFileFormatForRead(appId, appOwner).readAggregatedLogs( + request, os); } catch (Exception ex) { // Something wrong when we try to access the aggregated log. if (LOG.isDebugEnabled()) {