diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index a434ef5..3a4669c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -208,68 +208,74 @@ public LogValue(List rootLogDirs, ContainerId containerId, } return pendingUploadFiles; } - - public void write(DataOutputStream out, Set pendingUploadFiles) - throws IOException { - List fileList = new ArrayList(pendingUploadFiles); - Collections.sort(fileList); - + + private void write(DataOutputStream out, List fileList) throws IOException { for (File logFile : fileList) { - // We only aggregate top level files. - // Ignore anything inside sub-folders. if (logFile.isDirectory()) { - LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); - continue; + // Aggregate log files under subfolders + LOG.info(logFile.getAbsolutePath() + " is a directory. " + + "Aggregate files inside it."); + List logFiles = Arrays.asList(logFile.listFiles()); + Collections.sort(fileList); + write(out, logFiles); } + else { + FileInputStream in = null; + try { + in = secureOpenFile(logFile); + } catch (IOException e) { + logErrorMessage(logFile, e); + IOUtils.cleanup(LOG, in); + continue; + } - FileInputStream in = null; - try { - in = secureOpenFile(logFile); - } catch (IOException e) { - logErrorMessage(logFile, e); - IOUtils.cleanup(LOG, in); - continue; - } + final long fileLength = logFile.length(); + // Write the logFile Type + out.writeUTF(logFile.getName()); + + // Write the log length as UTF so that it is printable + out.writeUTF(String.valueOf(fileLength)); - final long fileLength = logFile.length(); - // Write the logFile Type - out.writeUTF(logFile.getName()); - - // Write the log length as UTF so that it is printable - out.writeUTF(String.valueOf(fileLength)); - - // Write the log itself - try { - byte[] buf = new byte[65535]; - int len = 0; - long bytesLeft = fileLength; - while ((len = in.read(buf)) != -1) { - //If buffer contents within fileLength, write - if (len < bytesLeft) { - out.write(buf, 0, len); - bytesLeft-=len; + // Write the log itself + try { + byte[] buf = new byte[65535]; + int len = 0; + long bytesLeft = fileLength; + while ((len = in.read(buf)) != -1) { + //If buffer contents within fileLength, write + if (len < bytesLeft) { + out.write(buf, 0, len); + bytesLeft-=len; + } + //else only write contents within fileLength, then exit early + else { + out.write(buf, 0, (int)bytesLeft); + break; + } } - //else only write contents within fileLength, then exit early - else { - out.write(buf, 0, (int)bytesLeft); - break; + long newLength = logFile.length(); + if(fileLength < newLength) { + LOG.warn("Aggregated logs truncated by approximately "+ + (newLength-fileLength) +" bytes."); } + this.uploadedFiles.add(logFile); + } catch (IOException e) { + String message = logErrorMessage(logFile, e); + out.write(message.getBytes()); + } finally { + IOUtils.cleanup(LOG, in); } - long newLength = logFile.length(); - if(fileLength < newLength) { - LOG.warn("Aggregated logs truncated by approximately "+ - (newLength-fileLength) +" bytes."); - } - this.uploadedFiles.add(logFile); - } catch (IOException e) { - String message = logErrorMessage(logFile, e); - out.write(message.getBytes()); - } finally { - IOUtils.cleanup(LOG, in); } } } + public void write(DataOutputStream out, Set pendingUploadFiles) + throws IOException { + List fileList = new ArrayList(pendingUploadFiles); + Collections.sort(fileList); + write(out, fileList); + } + @VisibleForTesting public FileInputStream secureOpenFile(File logFile) throws IOException { return SecureIOUtils.openForRead(logFile, getUser(), null); @@ -288,10 +294,16 @@ public String getUser() { } private Set getPendingLogFilesToUpload(File containerLogDir) { - Set candidates = - new HashSet(Arrays.asList(containerLogDir.listFiles())); - for (File logFile : candidates) { - this.allExistingFileMeta.add(getLogFileMetaData(logFile)); + Set candidates = new HashSet(); + List fileList = Arrays.asList(containerLogDir.listFiles()); + for (File logFile : fileList) { + if(logFile.isDirectory()) { + Set subDirCandidates = getPendingLogFilesToUpload(logFile); + candidates.addAll(subDirCandidates); + } else { + candidates.add(logFile); + this.allExistingFileMeta.add(getLogFileMetaData(logFile)); + } } if (this.logAggregationContext != null && candidates.size() > 0) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 4301bc9..b243742 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -253,14 +253,14 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception { // aggregated. String s = writer.toString(); int expectedLength = - "LogType:stdout".length() - + (logUploadedTime ? ("\nLog Upload Time:" + Times.format(System + "LogType:stdout".length() + "LogType:logs".length() + + (2*((logUploadedTime ? ("\nLog Upload Time:" + Times.format(System .currentTimeMillis())).length() : 0) + ("\nLogLength:" + numChars).length() - + "\nLog Contents:\n".length() + numChars + "\n".length(); + + "\nLog Contents:\n".length() + numChars + "\n".length())); Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr")); - Assert.assertTrue("log file:logs should not be aggregated.", !s.contains("LogType:logs")); + Assert.assertTrue("log file:logs should not be aggregated.", s.contains("LogType:logs")); Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars)); Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));