diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..81a6450af72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -801,57 +801,9 @@ public static void readAcontainerLogs(DataInputStream valueStream, private static void readContainerLogs(DataInputStream valueStream, PrintStream out, long logUploadedTime, long bytes) throws IOException { - byte[] buf = new byte[65535]; - - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - out.print("LogType:"); - out.println(fileType); - if (logUploadedTime != -1) { - out.print("Log Upload Time:"); - out.println(Times.format(logUploadedTime)); - } - out.print("LogLength:"); - out.println(fileLengthStr); - out.println("Log Contents:"); - - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, toSkip); - } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - out.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, skipAfterRead); - out.println("\nEnd of LogType:" + fileType); - out.println(""); + ContainerLogReader reader = + new ContainerLogReader(valueStream, out, logUploadedTime, bytes); + reader.doWork(); } /** @@ -922,72 +874,16 @@ public static int readContainerLogsForALogType( * @param valueStream the value stream * @param out the output print stream * @param logUploadedTime the log uploaded time stamp - * @param logType the given log type + * @param logTypes the given log types * @throws IOException if we can not read the container logs */ - public static int readContainerLogsForALogType( + static int readContainerLogsForALogType( DataInputStream valueStream, PrintStream out, long logUploadedTime, - List logType, long bytes) throws IOException { - byte[] buf = new byte[65535]; - - String fileType = valueStream.readUTF(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - if (logType.contains(fileType)) { - out.print("LogType:"); - out.println(fileType); - if (logUploadedTime != -1) { - out.print("Log Upload Time:"); - out.println(Times.format(logUploadedTime)); - } - out.print("LogLength:"); - out.println(fileLengthStr); - out.println("Log Contents:"); - - long toSkip = 0; - long totalBytesToRead = fileLength; - long skipAfterRead = 0; - if (bytes < 0) { - long absBytes = Math.abs(bytes); - if (absBytes < fileLength) { - toSkip = fileLength - absBytes; - totalBytesToRead = absBytes; - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, toSkip); - } else { - if (bytes < fileLength) { - totalBytesToRead = bytes; - skipAfterRead = fileLength - bytes; - } - } - - long curRead = 0; - long pendingRead = totalBytesToRead - curRead; - int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < totalBytesToRead) { - out.write(buf, 0, len); - curRead += len; - - pendingRead = totalBytesToRead - curRead; - toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - org.apache.hadoop.io.IOUtils.skipFully( - valueStream, skipAfterRead); - out.println("\nEnd of LogType:" + fileType); - out.println(""); - return 0; - } else { - long totalSkipped = 0; - long currSkipped = 0; - while (currSkipped != -1 && totalSkipped < fileLength) { - currSkipped = valueStream.skip(fileLength - totalSkipped); - totalSkipped += currSkipped; - } - return -1; - } + List logTypes, long bytes) throws IOException { + ContainerLogReader reader = + new ContainerLogReader(valueStream, out, logUploadedTime, bytes); + reader.setLogTypes(logTypes); + return reader.doWork(); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReader.java new file mode 100644 index 00000000000..1bfaff74a25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogReader.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.util.Times; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +import static org.apache.hadoop.io.IOUtils.skipFully; + +class ContainerLogReader { + private final byte[] buf; + private final DataInputStream valueStream; + private final PrintStream out; + private final long logUploadedTime; + private final long bytes; + private long fileLength; + private String fileType; + private List logTypes; + private String fileLengthStr; + + ContainerLogReader(DataInputStream valueStream, + PrintStream out, long logUploadedTime, long bytes) { + this.valueStream = valueStream; + this.out = out; + this.logUploadedTime = logUploadedTime; + this.bytes = bytes; + this.buf = new byte[65535]; + } + + void setLogTypes(List logTypes) { + this.logTypes = logTypes; + } + + int doWork() throws IOException { + this.fileType = valueStream.readUTF(); + this.fileLengthStr = valueStream.readUTF(); + this.fileLength = Long.parseLong(fileLengthStr); + + if (logTypes != null) + if (logTypes.contains(fileType)) { + printHeader(); + readLogsAndPrintContentsToStream(); + return 0; + } else { + skipFromValueStreamToEnd(); + return -1; + } + return 0; + } + + private void printHeader() { + out.print("LogType:"); + out.println(fileType); + + if (logUploadedTime != -1) { + out.print("Log Upload Time:"); + out.println(Times.format(logUploadedTime)); + } + + out.print("LogLength:"); + out.println(fileLengthStr); + } + + private void readLogsAndPrintContentsToStream() throws IOException { + out.println("Log Contents:"); + long toSkip = 0; + long totalBytesToRead = fileLength; + long skipAfterRead = 0; + if (bytes < 0) { + long absBytes = Math.abs(bytes); + if (absBytes < fileLength) { + toSkip = fileLength - absBytes; + totalBytesToRead = absBytes; + } + skipFully(valueStream, toSkip); + } else { + if (bytes < fileLength) { + totalBytesToRead = bytes; + skipAfterRead = fileLength - bytes; + } + } + + long curRead = 0; + long pendingRead = totalBytesToRead - curRead; + int toRead = + pendingRead > buf.length ? buf.length : (int) pendingRead; + int len = valueStream.read(buf, 0, toRead); + while (len != -1 && curRead < totalBytesToRead) { + out.write(buf, 0, len); + curRead += len; + pendingRead = totalBytesToRead - curRead; + toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; + len = valueStream.read(buf, 0, toRead); + } + + skipFully(valueStream, skipAfterRead); + out.println("\nEnd of LogType:" + fileType); + out.println(); + } + + private void skipFromValueStreamToEnd() throws IOException { + long totalSkipped = 0; + long currSkipped = 0; + while (currSkipped != -1 && totalSkipped < fileLength) { + currSkipped = valueStream.skip(fileLength - totalSkipped); + totalSkipped += currSkipped; + } + } +}