diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 132dca2..2e9e92d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -227,6 +227,7 @@ public void testFetchApplictionLogs() throws Exception { assertTrue(exitCode == 0); assertTrue(sysOutStream.toString().contains( "Hello container_0_0001_01_000001!")); + assertTrue(sysOutStream.toString().contains("LogUploadTime")); fs.delete(new Path(remoteLogRootDir), true); fs.delete(new Path(rootLogDir), true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index e1d1e00..a45d894 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Times; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -205,8 +207,8 @@ public LogValue(List rootLogDirs, ContainerId containerId, return pendingUploadFiles; } - public void write(DataOutputStream out, Set pendingUploadFiles) - throws IOException { + public void write(DataOutputStream out, Set pendingUploadFiles, + long uploadedTime) throws IOException { List fileList = new ArrayList(pendingUploadFiles); Collections.sort(fileList); @@ -215,6 +217,9 @@ public void write(DataOutputStream out, Set pendingUploadFiles) // Write the logFile Type out.writeUTF(logFile.getName()); + // Write the uploaded TimeStamp + out.writeUTF(Times.format(uploadedTime)); + // Write the log length as UTF so that it is printable out.writeUTF(String.valueOf(fileLength)); @@ -261,7 +266,7 @@ public String getUser() { return user; } - private Set getPendingLogFilesToUpload(File containerLogDir) { + private List getPendingLogFilesToUpload(File containerLogDir) { Set candidates = new HashSet(Arrays.asList(containerLogDir.listFiles())); for (File logFile : candidates) { @@ -291,7 +296,16 @@ public boolean apply(File next) { }); candidates = Sets.newHashSet(mask); } - return candidates; + + // sort the file by lastModfiedTime. + List candidatesList = new ArrayList(candidates); + Collections.sort(candidatesList, new Comparator() { + public int compare(File s1, File s2) { + return s1.lastModified() < s2.lastModified() ? -1 + : s1.lastModified() > s2.lastModified() ? 1 : 0; + } + }); + return candidatesList; } private void filterFiles(String pattern, Set candidates, @@ -404,7 +418,8 @@ public void writeApplicationACLs(Map appAcls) out.close(); } - public void append(LogKey logKey, LogValue logValue) throws IOException { + public void append(LogKey logKey, LogValue logValue, long uploadTime) + throws IOException { Set pendingUploadFiles = logValue.getPendingLogFilesToUploadForThisContainer(); if (pendingUploadFiles.size() == 0) { @@ -414,10 +429,15 @@ public void append(LogKey logKey, LogValue logValue) throws IOException { logKey.write(out); out.close(); out = this.writer.prepareAppendValue(-1); - logValue.write(out, pendingUploadFiles); + logValue.write(out, pendingUploadFiles, uploadTime); out.close(); } + public void append(LogKey logKey, LogValue logValue) + throws IOException { + append(logKey, logValue, System.currentTimeMillis()); + } + public void close() { try { this.writer.close(); @@ -619,6 +639,7 @@ public static void readAcontainerLogs(DataInputStream valueStream, int bufferSize = 65536; char[] cbuf = new char[bufferSize]; String fileType; + String uploadTime; String fileLengthStr; long fileLength; @@ -629,10 +650,13 @@ public static void readAcontainerLogs(DataInputStream valueStream, // EndOfFile return; } + uploadTime = valueStream.readUTF(); fileLengthStr = valueStream.readUTF(); fileLength = Long.parseLong(fileLengthStr); writer.write("\n\nLogType:"); writer.write(fileType); + writer.write("\nLogUploadTime:"); + writer.write(uploadTime); writer.write("\nLogLength:"); writer.write(fileLengthStr); writer.write("\nLog Contents:\n"); @@ -664,10 +688,13 @@ public static void readAContainerLogsForALogType( byte[] buf = new byte[65535]; String fileType = valueStream.readUTF(); + String uploadTime = valueStream.readUTF(); String fileLengthStr = valueStream.readUTF(); long fileLength = Long.parseLong(fileLengthStr); out.print("LogType: "); out.println(fileType); + out.print("LogUploadTime: "); + out.println(uploadTime); out.print("LogLength: "); out.println(fileLengthStr); out.println("Log Contents:"); @@ -698,6 +725,7 @@ public void close() { public static class ContainerLogsReader { private DataInputStream valueStream; private String currentLogType = null; + private String currentLogUpLoadTime = null; private long currentLogLength = 0; private BoundedInputStream currentLogData = null; private InputStreamReader currentLogISR; @@ -718,12 +746,14 @@ public String nextLog() throws IOException { } currentLogType = null; + currentLogUpLoadTime = null; currentLogLength = 0; currentLogData = null; currentLogISR = null; try { String logType = valueStream.readUTF(); + String logUpLoadTime = valueStream.readUTF(); String logLengthStr = valueStream.readUTF(); currentLogLength = Long.parseLong(logLengthStr); currentLogData = @@ -731,6 +761,7 @@ public String nextLog() throws IOException { currentLogData.setPropagateClose(false); currentLogISR = new InputStreamReader(currentLogData); currentLogType = logType; + currentLogUpLoadTime = logUpLoadTime; } catch (EOFException e) { } @@ -741,6 +772,10 @@ public String getCurrentLogType() { return currentLogType; } + public String getCurrentLogUpLoadTime() { + return currentLogUpLoadTime; + } + public long getCurrentLogLength() { return currentLogLength; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java index 16e6359..dd60f96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java @@ -198,12 +198,13 @@ private boolean readContainerLogs(Block html, if (desiredLogType == null || desiredLogType.isEmpty() || desiredLogType.equals(logType)) { long logLength = logReader.getCurrentLogLength(); - + String logUpLoadTime = logReader.getCurrentLogUpLoadTime(); if (foundLog) { html.pre()._("\n\n")._(); } html.p()._("Log Type: " + logType)._(); + html.p()._("Log UpLoadTime: " + logUpLoadTime)._(); html.p()._("Log Length: " + Long.toString(logLength))._(); long start = logLimits.start < 0 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 676a156..0d15091 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -37,7 +37,6 @@ import java.util.concurrent.CountDownLatch; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.util.Times; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -219,8 +219,11 @@ public void testReadAcontainerLogs1() throws Exception { LogReader.readAcontainerLogs(dis, writer); String s = writer.toString(); + int expectedLength = - "\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length() + "\n\nLogType:stdout".length() + + ("\nLogUploadTime:" + Times.format(System.currentTimeMillis())) + .length() + ("\nLogLength:" + numChars).length() + "\nLog Contents:\n".length() + numChars; Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars)); @@ -232,7 +235,7 @@ public void testReadAcontainerLogs1() throws Exception { } String expectedContent = sb.toString(); Assert.assertTrue("Log content incorrect", s.contains(expectedContent)); - + System.out.println(s); Assert.assertEquals(expectedLength, s.length()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6e196bb..cda0bbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -231,6 +231,7 @@ private void uploadLogsForContainers() { } boolean uploadedLogsInThisCycle = false; + long uploadTime = System.currentTimeMillis(); for (ContainerId container : pendingContainerInThisCycle) { ContainerLogAggregator aggregator = null; if (containerLogAggregators.containsKey(container)) { @@ -240,7 +241,7 @@ private void uploadLogsForContainers() { containerLogAggregators.put(container, aggregator); } Set uploadedFilePathsInThisCycle = - aggregator.doContainerLogAggregation(writer); + aggregator.doContainerLogAggregation(writer, uploadTime); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; } @@ -269,7 +270,7 @@ private void uploadLogsForContainers() { ? remoteNodeLogFileForApp : new Path( remoteNodeLogFileForApp.getParent(), remoteNodeLogFileForApp.getName() + "_" - + System.currentTimeMillis()); + + uploadTime); final boolean rename = uploadedLogsInThisCycle; try { @@ -491,7 +492,8 @@ public ContainerLogAggregator(ContainerId containerId) { this.containerId = containerId; } - public Set doContainerLogAggregation(LogWriter writer) { + public Set + doContainerLogAggregation(LogWriter writer, long uploadTime) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirs())); @@ -501,7 +503,7 @@ public ContainerLogAggregator(ContainerId containerId) { userUgi.getShortUserName(), logAggregationContext, this.uploadedFileMeta); try { - writer.append(logKey, logValue); + writer.append(logKey, logValue, uploadTime); } catch (Exception e) { LOG.error("Couldn't upload logs for " + containerId + ". Skipping this container."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index ab86a18..c0efa54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -770,15 +770,18 @@ private String verifyContainerLogs(LogAggregationService logAggregationService, Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8)); String fileType = writtenLines[0].substring(9); - Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10)); - String fileLengthStr = writtenLines[1].substring(11); + // we add one extra parameter: LogUploadedTime between LogType and + // LogLength. So increasing the array index to + // bypass the LogUploadedTime parameter + Assert.assertEquals("LogLength:", writtenLines[2].substring(0, 10)); + String fileLengthStr = writtenLines[2].substring(11); long fileLength = Long.parseLong(fileLengthStr); Assert.assertEquals("Log Contents:", - writtenLines[2].substring(0, 13)); + writtenLines[3].substring(0, 13)); String logContents = StringUtils.join( - Arrays.copyOfRange(writtenLines, 3, writtenLines.length), "\n"); + Arrays.copyOfRange(writtenLines, 4, writtenLines.length), "\n"); perContainerMap.put(fileType, logContents); LOG.info("LogType:" + fileType);