diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 2e9e92d..53c76fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -227,7 +227,7 @@ public void testFetchApplictionLogs() throws Exception { assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000001!")); - assertTrue(sysOutStream.toString().contains("LogUploadTime")); + assertTrue(sysOutStream.toString().contains("Log Upload Time")); fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 22219be..a3728ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.io.PrintWriter; import java.io.Writer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -233,9 +234,6 @@ public void write(DataOutputStream out, Set pendingUploadFiles) // Write the logFile Type out.writeUTF(logFile.getName()); - // Write the uploaded TimeStamp - out.writeLong(System.currentTimeMillis()); - // Write the log length as UTF so that it is printable out.writeUTF(String.valueOf(fileLength)); @@ -639,44 +637,61 @@ public ContainerLogsReader getContainerLogsReader( * Writes all logs for a single container to the provided writer. * @param valueStream * @param writer + * @param logUploadedTime * @throws IOException */ public static void readAcontainerLogs(DataInputStream valueStream, - Writer writer) throws IOException { - int bufferSize = 65536; - char[] cbuf = new char[bufferSize]; - String fileType; - long uploadTime; - String fileLengthStr; - long fileLength; - + Writer writer, long logUploadedTime) throws IOException { while (true) { try { - fileType = valueStream.readUTF(); + readContainerLogs(valueStream, writer, logUploadedTime); } catch (EOFException e) { // EndOfFile return; } - uploadTime = valueStream.readLong(); - fileLengthStr = valueStream.readUTF(); - fileLength = Long.parseLong(fileLengthStr); - writer.write("\n\nLogType:"); - writer.write(fileType); - writer.write("\nLogUploadTime:"); - writer.write(String.valueOf(uploadTime)); - writer.write("\nLogLength:"); - writer.write(fileLengthStr); - writer.write("\nLog Contents:\n"); - // ByteLevel - BoundedInputStream bis = - new BoundedInputStream(valueStream, fileLength); - InputStreamReader reader = new InputStreamReader(bis); - int currentRead = 0; - int totalRead = 0; - while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) { - writer.write(cbuf, 0, currentRead); - totalRead += currentRead; - } + } + } + + /** + * Writes all logs for a single container to the provided writer. + * @param valueStream + * @param writer + * @throws IOException + */ + public static void readAcontainerLogs(DataInputStream valueStream, + Writer writer) throws IOException { + readAcontainerLogs(valueStream, writer, -1); + } + + private static void readContainerLogs(DataInputStream valueStream, + Writer writer, long logUploadedTime) throws IOException { + int bufferSize = 65536; + char[] cbuf = new char[bufferSize]; + String fileType; + String fileLengthStr; + long fileLength; + fileType = valueStream.readUTF(); + fileLengthStr = valueStream.readUTF(); + fileLength = Long.parseLong(fileLengthStr); + writer.write("\n\nLogType:"); + writer.write(fileType); + if (logUploadedTime != -1) { + writer.write("\nLog Upload Time:"); + writer.write(Times.format(logUploadedTime)); + } + writer.write("\nLogLength:"); + writer.write(fileLengthStr); + writer.flush(); + writer.write("\nLog Contents:\n"); + // ByteLevel + BoundedInputStream bis = new BoundedInputStream(valueStream, fileLength); + InputStreamReader reader = new InputStreamReader(bis); + int currentRead = 0; + int totalRead = 0; + while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) { + writer.write(cbuf, 0, currentRead); + writer.flush(); + totalRead += currentRead; } } @@ -686,41 +701,29 @@ public static void readAcontainerLogs(DataInputStream valueStream, * * @param valueStream * @param out + * @param logUploadedTime * @throws IOException */ public static void readAContainerLogsForALogType( - DataInputStream valueStream, PrintStream out) + DataInputStream valueStream, PrintStream out, long logUploadedTime) throws IOException { - byte[] buf = new byte[65535]; - - String fileType = valueStream.readUTF(); - long uploadTime = valueStream.readLong(); - String fileLengthStr = valueStream.readUTF(); - long fileLength = Long.parseLong(fileLengthStr); - out.print("LogType: "); - out.println(fileType); - out.print("LogUploadTime: "); - out.println(Times.format(uploadTime)); - out.print("LogLength: "); - out.println(fileLengthStr); - out.println("Log Contents:"); - - long curRead = 0; - long pendingRead = fileLength - curRead; - int toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - int len = valueStream.read(buf, 0, toRead); - while (len != -1 && curRead < fileLength) { - out.write(buf, 0, len); - curRead += len; - - pendingRead = fileLength - curRead; - toRead = - pendingRead > buf.length ? buf.length : (int) pendingRead; - len = valueStream.read(buf, 0, toRead); - } - out.println(""); + PrintWriter writer = new PrintWriter(out); + readContainerLogs(valueStream, writer, logUploadedTime); + } + + /** + * Keep calling this till you get a {@link EOFException} for getting logs of + * all types for a single container. + * + * @param valueStream + * @param out + * @throws IOException + */ + public static void readAContainerLogsForALogType( + DataInputStream valueStream, PrintStream out) + throws IOException { + readAContainerLogsForALogType(valueStream, out, -1); } public void close() { @@ -732,7 +735,6 @@ public void close() { public static class ContainerLogsReader { private DataInputStream valueStream; private String currentLogType = null; - private long currentLogUpLoadTime = 0; private long currentLogLength = 0; private BoundedInputStream currentLogData = null; private InputStreamReader currentLogISR; @@ -753,14 +755,12 @@ public String nextLog() throws IOException { } currentLogType = null; - currentLogUpLoadTime = 0; currentLogLength = 0; currentLogData = null; currentLogISR = null; try { String logType = valueStream.readUTF(); - long logUpLoadTime = valueStream.readLong(); String logLengthStr = valueStream.readUTF(); currentLogLength = Long.parseLong(logLengthStr); currentLogData = @@ -768,7 +768,6 @@ public String nextLog() throws IOException { currentLogData.setPropagateClose(false); currentLogISR = new InputStreamReader(currentLogData); currentLogType = logType; - currentLogUpLoadTime = logUpLoadTime; } catch (EOFException e) { } @@ -779,10 +778,6 @@ public String getCurrentLogType() { return currentLogType; } - public long getCurrentLogUpLoadTime() { - return currentLogUpLoadTime; - } - public long getCurrentLogLength() { return currentLogLength; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index de06d48..daf0c22 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -78,7 +78,8 @@ public int dumpAContainersLogs(String appId, String containerId, reader = new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath()); - if (dumpAContainerLogs(containerId, reader, System.out) > -1) { + if (dumpAContainerLogs(containerId, reader, System.out, + thisNodeFile.getModificationTime()) > -1) { foundContainerLogs = true; } } finally { @@ -97,7 +98,8 @@ public int dumpAContainersLogs(String appId, String containerId, @Private public int dumpAContainerLogs(String containerIdStr, - AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException { + AggregatedLogFormat.LogReader reader, PrintStream out, + long logUploadedTime) throws IOException { DataInputStream valueStream; LogKey key = new LogKey(); valueStream = reader.next(key); @@ -114,7 +116,8 @@ public int dumpAContainerLogs(String containerIdStr, while (true) { try { - LogReader.readAContainerLogsForALogType(valueStream, out); + LogReader.readAContainerLogsForALogType(valueStream, out, + logUploadedTime); } catch (EOFException eof) { break; } @@ -163,7 +166,8 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner, out.println(StringUtils.repeat("=", containerString.length())); while (true) { try { - LogReader.readAContainerLogsForALogType(valueStream, out); + LogReader.readAContainerLogsForALogType(valueStream, out, + thisNodeFile.getModificationTime()); foundAnyLogs = true; } catch (EOFException eof) { break; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index bba3258..102df92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -126,6 +126,7 @@ protected void render(Block html) { .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) { continue; } + long logUploadedTime = thisNodeFile.getModificationTime(); reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath()); @@ -164,7 +165,7 @@ protected void render(Block html) { } foundLog = readContainerLogs(html, logReader, logLimits, - desiredLogType); + desiredLogType, logUploadedTime); } catch (IOException ex) { LOG.error("Error getting logs for " + logEntity, ex); continue; @@ -189,7 +190,7 @@ protected void render(Block html) { private boolean readContainerLogs(Block html, AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits, - String desiredLogType) throws IOException { + String desiredLogType, long logUpLoadTime) throws IOException { int bufferSize = 65536; char[] cbuf = new char[bufferSize]; @@ -199,13 +200,12 @@ private boolean readContainerLogs(Block html, if (desiredLogType == null || desiredLogType.isEmpty() || desiredLogType.equals(logType)) { long logLength = logReader.getCurrentLogLength(); - long logUpLoadTime = logReader.getCurrentLogUpLoadTime(); if (foundLog) { html.pre()._("\n\n")._(); } html.p()._("Log Type: " + logType)._(); - html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._(); + html.p()._("Log UpLoad Time: " + Times.format(logUpLoadTime))._(); html.p()._("Log Length: " + Long.toString(logLength))._(); long start = logLimits.start < 0 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index bc0485e..320a8f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.util.Times; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -178,9 +179,16 @@ public void run() { logWriter.close(); } - //Verify the output generated by readAContainerLogs(DataInputStream, Writer) @Test public void testReadAcontainerLogs1() throws Exception { + //Verify the output generated by readAContainerLogs(DataInputStream, Writer, logUploadedTime) + testReadAcontainerLog(true); + + //Verify the output generated by readAContainerLogs(DataInputStream, Writer) + testReadAcontainerLog(false); + } + + private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { Configuration conf = new Configuration(); File workDir = new File(testWorkDir, "testReadAcontainerLogs1"); Path remoteAppLogFile = @@ -233,15 +241,21 @@ public void testReadAcontainerLogs1() throws Exception { LogKey rLogKey = new LogKey(); DataInputStream dis = logReader.next(rLogKey); Writer writer = new StringWriter(); - LogReader.readAcontainerLogs(dis, writer); - - // We should only do the log aggregation for stdout. + + if (logUploadedTime) { + LogReader.readAcontainerLogs(dis, writer, System.currentTimeMillis()); + } else { + LogReader.readAcontainerLogs(dis, writer); + } + + // We should only do the log aggregation for stdout. // Since we could not open the fileInputStream for stderr, this file is not // aggregated. String s = writer.toString(); int expectedLength = "\n\nLogType:stdout".length() - + ("\nLogUploadTime:" + System.currentTimeMillis()).length() + + (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System + .currentTimeMillis())).length() : 0) + ("\nLogLength:" + numChars).length() + "\nLog Contents:\n".length() + numChars; Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index cea71fa..b17c2a1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -759,30 +759,28 @@ private String verifyContainerLogs(LogAggregationService logAggregationService, try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); + // It will write out two empty line first + // So, to get the LogType, we need to bypass the empty lines LogReader.readAContainerLogsForALogType(valueStream, ps); String writtenLines[] = baos.toString().split( System.getProperty("line.separator")); - Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8)); - String fileType = writtenLines[0].substring(9); + Assert.assertEquals("LogType:", writtenLines[2].substring(0, 8)); + String fileType = writtenLines[2].substring(8); - Assert.assertEquals("LogUploadTime:", writtenLines[1].substring(0, 14)); - String fileUploadedTimeStr = writtenLines[1].substring(15); - - Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10)); - String fileLengthStr = writtenLines[2].substring(11); + Assert.assertEquals("LogLength:", writtenLines[3].substring(0, 10)); + String fileLengthStr = writtenLines[3].substring(10); long fileLength = Long.parseLong(fileLengthStr); Assert.assertEquals("Log Contents:", - writtenLines[3].substring(0, 13)); + writtenLines[4].substring(0, 13)); String logContents = StringUtils.join( - Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n"); + Arrays.copyOfRange(writtenLines, 5, writtenLines.length), "\n"); perContainerMap.put(fileType, logContents); LOG.info("LogType:" + fileType); - LOG.info("LogUploadTime:" + fileUploadedTimeStr); LOG.info("LogLength:" + fileLength); LOG.info("Log Contents:\n" + perContainerMap.get(fileType)); } catch (EOFException eof) {