diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..63f108927b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.util.Times; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -801,57 +800,9 @@ public static void readAcontainerLogs(DataInputStream valueStream, private static void readContainerLogs(DataInputStream valueStream, PrintStream out, long logUploadedTime, long bytes) throws IOException { - byte[] buf = new byte[65535]; - - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - out.print("LogType:"); - out.println(fileType); - if (logUploadedTime != -1) { - out.print("Log Upload Time:"); - out.println(Times.format(logUploadedTime)); - } - out.print("LogLength:"); - out.println(fileLengthStr); - out.println("Log Contents:"); - - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, toSkip); - } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - out.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, skipAfterRead); - out.println("\nEnd of LogType:" + fileType); - out.println(""); + NormalContainerLogReader reader = + new NormalContainerLogReader(valueStream, out, logUploadedTime, bytes); + reader.doWork(); } /** @@ -922,72 +873,15 @@ public static int readContainerLogsForALogType( * @param valueStream the value stream * @param out the output print stream * @param logUploadedTime the log uploaded time stamp - * @param logType the given log type + * @param logTypes the given log types * @throws IOException if we can not read the container logs */ - public static int readContainerLogsForALogType( + static int readContainerLogsForALogType( DataInputStream valueStream, PrintStream out, long logUploadedTime, - List logType, long bytes) throws IOException { - byte[] buf = new byte[65535]; - - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - if (logType.contains(fileType)) { - out.print("LogType:"); - out.println(fileType); - if (logUploadedTime != -1) { - out.print("Log Upload Time:"); - out.println(Times.format(logUploadedTime)); - } - out.print("LogLength:"); - out.println(fileLengthStr); - out.println("Log Contents:"); - - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, toSkip); - } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - out.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, skipAfterRead); - out.println("\nEnd of LogType:" + fileType); - out.println(""); - return 0; - } else { - long totalSkipped = 0; - long currSkipped = 0; - while (currSkipped != -1 && totalSkipped < fileLength) { - currSkipped = valueStream.skip(fileLength - totalSkipped); - totalSkipped += currSkipped; - } - return -1; - } + List logTypes, long bytes) throws IOException { + NormalContainerLogReader reader = + new NormalContainerLogReader(valueStream, out, logUploadedTime, bytes, logTypes); + return reader.doWork(); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReaderBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReaderBase.java new file mode 100644 index 00000000000..c70bf27a540 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReaderBase.java @@ -0,0 +1,73 @@ +package org.apache.hadoop.yarn.logaggregation; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import static org.apache.hadoop.io.IOUtils.skipFully; + +public abstract class ContainerLogReaderBase { + protected final byte[] buf; + protected final InputStream valueStream; + protected final OutputStream out; + protected final long outputSize; + protected long fileLength; + protected String fileType; + protected List logTypes; + protected String fileLengthStr; + + ContainerLogReaderBase(InputStream valueStream, + OutputStream out, long outputSize) { + this(valueStream, out, outputSize, new byte[65535]); + } + + ContainerLogReaderBase(InputStream valueStream, + OutputStream out, long outputSize, byte[] buf) { + this.valueStream = valueStream; + this.out = out; + this.outputSize = outputSize; + this.buf = buf; + } + + abstract int doWork() throws IOException; + + abstract void printContainerLogHeader() throws IOException; + + void readLogsAndPrintContentsToStream() throws IOException { + long toSkip = 0; + long totalBytesToRead = fileLength; + long skipAfterRead = 0; + if (outputSize < 0) { + long absBytes = Math.abs(outputSize); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + skipFully(valueStream, toSkip); + } else { + if (outputSize < fileLength) { + totalBytesToRead = outputSize; + skipAfterRead = fileLength - outputSize; + } + } + + long curRead = 0; + long pendingRead = totalBytesToRead - curRead; + int toRead = + pendingRead > buf.length ? buf.length : (int) pendingRead; + int len = valueStream.read(buf, 0, toRead); + + printContainerLogHeader(); + while (len != -1 && curRead < totalBytesToRead) { + out.write(buf, 0, len); + curRead += len; + pendingRead = totalBytesToRead - curRead; + toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; + len = valueStream.read(buf, 0, toRead); + } + + skipFully(valueStream, skipAfterRead); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java index 3c56b0290d7..db7a40d3226 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java @@ -51,26 +51,6 @@ private LogToolUtils() {} public static final String CONTAINER_ON_NODE_PATTERN = "Container: %s on %s"; - /** - * Formats the header of an aggregated log file. - */ - private static byte[] formatContainerLogHeader(String containerId, - String nodeId, ContainerLogAggregationType logType, String fileName, - String lastModifiedTime, long fileLength) { - StringBuilder sb = new StringBuilder(); - String containerStr = String.format( - LogToolUtils.CONTAINER_ON_NODE_PATTERN, - containerId, nodeId); - sb.append(containerStr + "\n") - .append("LogAggregationType: " + logType + "\n") - .append(StringUtils.repeat("=", containerStr.length()) + "\n") - .append("LogType:" + fileName + "\n") - .append("LogLastModifiedTime:" + lastModifiedTime + "\n") - .append("LogLength:" + fileLength + "\n") - .append("LogContents:\n"); - return sb.toString().getBytes(Charset.forName("UTF-8")); - } - /** * Output container log. * @param containerId the containerId @@ -89,83 +69,10 @@ public static void outputContainerLog(String containerId, String nodeId, String fileName, long fileLength, long outputSize, String lastModifiedTime, InputStream fis, OutputStream os, byte[] buf, ContainerLogAggregationType logType) throws IOException { - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (outputSize < 0) { - long absBytes = Math.abs(outputSize); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully(fis, toSkip); - } else { - if (outputSize < fileLength) { - totalBytesToRead = outputSize; - skipAfterRead = fileLength - outputSize; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - int len = fis.read(buf, 0, toRead); - boolean keepGoing = (len != -1 && curRead < totalBytesToRead); - - byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName, - lastModifiedTime, fileLength); - os.write(b, 0, b.length); - while (keepGoing) { - os.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = pendingRead > buf.length ? buf.length - : (int) pendingRead; - len = fis.read(buf, 0, toRead); - keepGoing = (len != -1 && curRead < totalBytesToRead); - } - org.apache.hadoop.io.IOUtils.skipFully(fis, skipAfterRead); - os.flush(); - } - - public static void outputContainerLogThroughZeroCopy(String containerId, - String nodeId, String fileName, long fileLength, long outputSize, - String lastModifiedTime, FileInputStream fis, OutputStream os, - ContainerLogAggregationType logType) throws IOException { - long toSkip = 0; - long totalBytesToRead = fileLength; - if (outputSize < 0) { - long absBytes = Math.abs(outputSize); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - } else { - if (outputSize < fileLength) { - totalBytesToRead = outputSize; - } - } - - // output log summary - byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName, - lastModifiedTime, fileLength); - os.write(b, 0, b.length); - - if (totalBytesToRead > 0) { - // output log content - FileChannel inputChannel = fis.getChannel(); - WritableByteChannel outputChannel = Channels.newChannel(os); - long position = toSkip; - while (totalBytesToRead > 0) { - long transferred = - inputChannel.transferTo(position, totalBytesToRead, outputChannel); - totalBytesToRead -= transferred; - position += transferred; - } - os.flush(); - } + SecondContainerLogReader logReader = + new SecondContainerLogReader(containerId, nodeId, fileName, fileLength, + outputSize, lastModifiedTime, fis, os, buf, logType); + logReader.doWork(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/NormalContainerLogReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/NormalContainerLogReader.java new file mode 100644 index 00000000000..7b52cebec30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/NormalContainerLogReader.java @@ -0,0 +1,85 @@ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.util.Times; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +class NormalContainerLogReader extends ContainerLogReaderBase { + private final long logUploadedTime; + private List logTypes; + + NormalContainerLogReader(DataInputStream valueStream, + PrintStream out, long logUploadedTime, long bytes) { + super(valueStream, out, bytes); + this.logUploadedTime = logUploadedTime; + } + + NormalContainerLogReader(DataInputStream valueStream, + PrintStream out, long logUploadedTime, long bytes, List logTypes) { + super(valueStream, out, bytes); + this.logUploadedTime = logUploadedTime; + this.logTypes = logTypes; + } + + int doWork() throws IOException { + this.fileType = ((DataInputStream)valueStream).readUTF(); + this.fileLengthStr = ((DataInputStream)valueStream).readUTF(); + this.fileLength = Long.parseLong(fileLengthStr); + + if (logTypes != null) { + if (logTypes.contains(fileType)) { + doWorkInternal(); + return 0; + } else { + skipFromValueStreamToEnd(); + return -1; + } + } else { + doWorkInternal(); + return 0; + } + } + + private void doWorkInternal() throws IOException { + printHeader(); + readLogsAndPrintContentsToStream(); + printEndOfLogType(); + } + + private void skipFromValueStreamToEnd() throws IOException { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip(fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } + + @Override + void printContainerLogHeader() { + ((PrintStream)out).println("Log Contents:"); + } + + private void printHeader() { + PrintStream ps = (PrintStream)out; + ps.print("LogType:"); + ps.println(fileType); + + if (logUploadedTime != -1) { + ps.print("Log Upload Time:"); + ps.println(Times.format(logUploadedTime)); + } + + ps.print("LogLength:"); + ps.println(fileLengthStr); + } + + private void printEndOfLogType() { + PrintStream ps = (PrintStream)out; + ps.println("\nEnd of LogType:" + fileType); + ps.println(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/SecondContainerLogReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/SecondContainerLogReader.java new file mode 100644 index 00000000000..fa0b0ba9f18 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/SecondContainerLogReader.java @@ -0,0 +1,117 @@ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.commons.lang3.StringUtils; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.Charset; + +public class SecondContainerLogReader extends ContainerLogReaderBase { + private final String containerId; + private final String nodeId; + private final String fileName; + private final long fileLength; + private final String lastModifiedTime; + private final ContainerLogAggregationType logType; + + public SecondContainerLogReader(String containerId, String nodeId, + String fileName, long fileLength, long outputSize, + String lastModifiedTime, InputStream fis, OutputStream os, + byte[] buf, ContainerLogAggregationType logType) { + super(fis, os, outputSize, buf); + this.containerId = containerId; + this.nodeId = nodeId; + this.fileName = fileName; + this.fileLength = fileLength; + this.lastModifiedTime = lastModifiedTime; + this.logType = logType; + } + + public SecondContainerLogReader(String containerId, String nodeId, + String fileName, long fileLength, long outputSize, + String lastModifiedTime, InputStream fis, OutputStream os, + ContainerLogAggregationType logType) { + super(fis, os, outputSize); + this.containerId = containerId; + this.nodeId = nodeId; + this.fileName = fileName; + this.fileLength = fileLength; + this.lastModifiedTime = lastModifiedTime; + this.logType = logType; + } + + int doWork() throws IOException { + readLogsAndPrintContentsToStream(); + return 0; + } + + int doWorkZeroCopy() throws IOException { + outputContainerLogThroughZeroCopy(); + out.flush(); + return 0; + } + + @Override + void printContainerLogHeader() throws IOException { + byte[] b = formatContainerLogHeader(containerId, nodeId, logType, fileName, + lastModifiedTime, fileLength); + out.write(b, 0, b.length); + } + + public void outputContainerLogThroughZeroCopy() throws IOException { + long toSkip = 0; + long totalBytesToRead = fileLength; + if (outputSize < 0) { + long absBytes = Math.abs(outputSize); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + } else { + if (outputSize < fileLength) { + totalBytesToRead = outputSize; + } + } + + // output log summary + printContainerLogHeader(); + if (totalBytesToRead > 0) { + // output log content + FileChannel inputChannel = ((FileInputStream)valueStream).getChannel(); + WritableByteChannel outputChannel = Channels.newChannel(out); + long position = toSkip; + while (totalBytesToRead > 0) { + long transferred = + inputChannel.transferTo(position, totalBytesToRead, outputChannel); + totalBytesToRead -= transferred; + position += transferred; + } + out.flush(); + } + } + + /** + * Formats the header of an aggregated log file. + */ + private static byte[] formatContainerLogHeader(String containerId, + String nodeId, ContainerLogAggregationType logType, String fileName, + String lastModifiedTime, long fileLength) { + StringBuilder sb = new StringBuilder(); + String containerStr = String.format( + LogToolUtils.CONTAINER_ON_NODE_PATTERN, + containerId, nodeId); + sb.append(containerStr + "\n") + .append("LogAggregationType: " + logType + "\n") + .append(StringUtils.repeat("=", containerStr.length()) + "\n") + .append("LogType:" + fileName + "\n") + .append("LogLastModifiedTime:" + lastModifiedTime + "\n") + .append("LogLength:" + fileLength + "\n") + .append("LogContents:\n"); + return sb.toString().getBytes(Charset.forName("UTF-8")); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java index d485c55bc0e..782b2ab4b4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.yarn.logaggregation.SecondContainerLogReader; import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord; import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; @@ -475,10 +476,12 @@ public Response getLogs( public void write(OutputStream os) throws IOException, WebApplicationException { try { - LogToolUtils.outputContainerLogThroughZeroCopy( - containerId.toString(), nmContext.getNodeId().toString(), - outputFileName, fileLength, bytes, lastModifiedTime, fis, os, - ContainerLogAggregationType.LOCAL); + SecondContainerLogReader containerLogReader = + new SecondContainerLogReader(containerId.toString(), + nmContext.getNodeId().toString(), outputFileName, + fileLength, bytes, lastModifiedTime, fis, os, + ContainerLogAggregationType.LOCAL); + containerLogReader.outputContainerLogThroughZeroCopy(); StringBuilder sb = new StringBuilder(); String endOfFile = "End of LogType:" + outputFileName; sb.append(endOfFile + ".");