diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 2e9e92d..a27b0a2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -172,6 +173,7 @@ public void testFetchApplictionLogs() throws Exception { ApplicationAttemptIdPBImpl.newInstance(appId, 1); ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1); ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2); + ContainerId containerId3 = ContainerIdPBImpl.newInstance(appAttemptId, 3); NodeId nodeId = NodeId.newInstance("localhost", 1234); @@ -206,6 +208,11 @@ public void testFetchApplictionLogs() throws Exception { containerId1, path, fs); uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId, containerId2, path, fs); + // This will create an old version log file. + // FileName: oldVersionLogs + // FileContent: Hello, oldVersionLogs!! + uploadPreviousVersionContainerLogIntoRemoteDir(ugi, configuration, + rootLogDirs, nodeId, containerId3, path, fs); YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED); @@ -218,6 +225,7 @@ public void testFetchApplictionLogs() throws Exception { "Hello container_0_0001_01_000001!")); assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000002!")); + assertTrue(sysOutStream.toString().contains("Hello, oldVersionLogs!!")); sysOutStream.reset(); exitCode = @@ -228,7 +236,16 @@ public void testFetchApplictionLogs() throws Exception { assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000001!")); assertTrue(sysOutStream.toString().contains("LogUploadTime")); + sysOutStream.reset(); + exitCode = + cli.run(new String[] { "-applicationId", appId.toString(), + "-nodeAddress", nodeId.toString(), "-containerId", + containerId3.toString() }); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains("oldVersionLogs")); + assertTrue(sysOutStream.toString().contains("Hello, oldVersionLogs!!")); + assertTrue(!sysOutStream.toString().contains("LogUploadTime")); fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); } @@ -266,6 +283,37 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, writer.close(); } + private static void uploadPreviousVersionContainerLogIntoRemoteDir( + UserGroupInformation ugi, Configuration configuration, + List rootLogDirs, NodeId nodeId, ContainerId containerId, + Path appDir, FileSystem fs) throws Exception { + Path path = + new Path(appDir, LogAggregationUtils.getNodeString(nodeId) + + System.currentTimeMillis()); + AggregatedLogFormat.LogWriter writer = + new AggregatedLogFormat.LogWriter(configuration, path, ugi); + + writer.writeApplicationOwner(ugi.getUserName()); + + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + writer.append(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + UserGroupInformation.getCurrentUser().getShortUserName())); + DataOutputStream out = writer.getWriter().prepareAppendKey(-1); + new AggregatedLogFormat.LogKey(containerId).write(out); + out.close(); + out = writer.getWriter().prepareAppendValue(-1); + out.writeUTF("oldVersionLogs"); + byte[] content = "Hello, oldVersionLogs!!".getBytes(); + out.writeUTF(String.valueOf(content.length)); + out.write(content); + out.close(); + writer.close(); + } + private YarnClient createMockYarnClient(YarnApplicationState appState) throws YarnException, IOException { YarnClient mockClient = mock(YarnClient.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 22219be..6c52211 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -86,7 +86,12 @@ private static final Map RESERVED_KEYS; //Maybe write out the retention policy. //Maybe write out a list of containerLogs skipped by the retention policy. - private static final int VERSION = 1; + //Change from 1 to 2, NM could wake up periodically to upload logs even + //when applications are Running. To enable this, we need set configure: + //yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds + //with proper value. + private static final int VERSION = 2; + private static final String LOG_UPLOADED_TIME_KEY = "logUploadedTime"; /** * Umask for the log file. @@ -234,7 +239,8 @@ public void write(DataOutputStream out, Set pendingUploadFiles) out.writeUTF(logFile.getName()); // Write the uploaded TimeStamp - out.writeLong(System.currentTimeMillis()); + out.writeUTF(String.valueOf(System.currentTimeMillis()) + ":" + + LOG_UPLOADED_TIME_KEY); // Write the log length as UTF so that it is printable out.writeUTF(String.valueOf(fileLength)); @@ -400,6 +406,11 @@ public FSDataOutputStream run() throws Exception { writeVersion(); } + @VisibleForTesting + public TFile.Writer getWriter() { + return this.writer; + } + private void writeVersion() throws IOException { DataOutputStream out = this.writer.prepareAppendKey(-1); VERSION_KEY.write(out); @@ -646,7 +657,6 @@ public static void readAcontainerLogs(DataInputStream valueStream, int bufferSize = 65536; char[] cbuf = new char[bufferSize]; String fileType; - long uploadTime; String fileLengthStr; long fileLength; @@ -657,13 +667,20 @@ public static void readAcontainerLogs(DataInputStream valueStream, // EndOfFile return; } - uploadTime = valueStream.readLong(); - fileLengthStr = valueStream.readUTF(); + String str = valueStream.readUTF(); + long uploadTime = parseLogUpLoadedTime(str); + if (uploadTime != -1) { + fileLengthStr = valueStream.readUTF(); + } else { + fileLengthStr = str; + } fileLength = Long.parseLong(fileLengthStr); writer.write("\n\nLogType:"); writer.write(fileType); - writer.write("\nLogUploadTime:"); - writer.write(String.valueOf(uploadTime)); + if (uploadTime != -1) { + writer.write("\nLogUploadTime:"); + writer.write(String.valueOf(uploadTime)); + } writer.write("\nLogLength:"); writer.write(fileLengthStr); writer.write("\nLog Contents:\n"); @@ -695,13 +712,21 @@ public static void readAContainerLogsForALogType( byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); - long uploadTime = valueStream.readLong(); - String fileLengthStr = valueStream.readUTF(); + String str = valueStream.readUTF(); + long uploadTime = parseLogUpLoadedTime(str); + String fileLengthStr; + if (uploadTime != -1) { + fileLengthStr = valueStream.readUTF(); + } else { + fileLengthStr = str; + } long fileLength = Long.parseLong(fileLengthStr); out.print("LogType: "); out.println(fileType); - out.print("LogUploadTime: "); - out.println(Times.format(uploadTime)); + if (uploadTime != -1) { + out.print("LogUploadTime: "); + out.println(Times.format(uploadTime)); + } out.print("LogLength: "); out.println(fileLengthStr); out.println("Log Contents:"); @@ -760,8 +785,14 @@ public String nextLog() throws IOException { try { String logType = valueStream.readUTF(); - long logUpLoadTime = valueStream.readLong(); - String logLengthStr = valueStream.readUTF(); + String logLengthStr; + String str = valueStream.readUTF(); + long logUpLoadTime = parseLogUpLoadedTime(str); + if (logUpLoadTime != -1) { + logLengthStr = valueStream.readUTF(); + } else { + logLengthStr = str; + } currentLogLength = Long.parseLong(logLengthStr); currentLogData = new BoundedInputStream(valueStream, currentLogLength); @@ -799,4 +830,11 @@ public int read(char[] buf, int off, int len) throws IOException { return currentLogISR.read(buf, off, len); } } + + private static long parseLogUpLoadedTime(String str) { + if (str.endsWith(LOG_UPLOADED_TIME_KEY)) { + return Long.parseLong(str.split(":")[0].trim()); + } + return -1; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index bba3258..ba39d94 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -205,7 +205,9 @@ private boolean readContainerLogs(Block html, } html.p()._("Log Type: " + logType)._(); - html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._(); + if (logUpLoadTime != -1) { + html.p()._("Log UpLoadTime: " + Times.format(logUpLoadTime))._(); + } html.p()._("Log Length: " + Long.toString(logLength))._(); long start = logLimits.start < 0