diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index e1d1e00..73fb177 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -211,6 +212,17 @@ public void write(DataOutputStream out, Set pendingUploadFiles) Collections.sort(fileList); for (File logFile : fileList) { + + FileInputStream in = null; + try { + in = secureOpenFile(logFile); + } catch (IOException ex) { + String message = "Error opening log file. Log file : " + + logFile.getAbsolutePath() + ". " + ex.getMessage(); + LOG.error(message, ex); + continue; + } + final long fileLength = logFile.length(); // Write the logFile Type out.writeUTF(logFile.getName()); @@ -219,9 +231,7 @@ public void write(DataOutputStream out, Set pendingUploadFiles) out.writeUTF(String.valueOf(fileLength)); // Write the log itself - FileInputStream in = null; try { - in = SecureIOUtils.openForRead(logFile, getUser(), null); byte[] buf = new byte[65535]; int len = 0; long bytesLeft = fileLength; @@ -245,7 +255,7 @@ public void write(DataOutputStream out, Set pendingUploadFiles) this.uploadedFiles.add(logFile); } catch (IOException e) { String message = "Error aggregating log file. Log file : " - + logFile.getAbsolutePath() + e.getMessage(); + + logFile.getAbsolutePath() + ". " + e.getMessage(); LOG.error(message, e); out.write(message.getBytes()); } finally { @@ -256,6 +266,11 @@ public void write(DataOutputStream out, Set pendingUploadFiles) } } + @VisibleForTesting + public FileInputStream secureOpenFile(File logFile) throws IOException { + return SecureIOUtils.openForRead(logFile, getUser(), null); + } + // Added for testing purpose. public String getUser() { return user; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 676a156..7a20b5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; import java.io.BufferedReader; import java.io.DataInputStream; @@ -37,7 +38,6 @@ import java.util.concurrent.CountDownLatch; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -194,6 +194,8 @@ public void testReadAcontainerLogs1() throws Exception { int numChars = 80000; + // create file stderr and stdout in containerLogDir + writeSrcFile(srcFilePath, "stderr", numChars); writeSrcFile(srcFilePath, "stdout", numChars); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); @@ -204,7 +206,14 @@ public void testReadAcontainerLogs1() throws Exception { new LogValue(Collections.singletonList(srcFileRoot.toString()), testContainerId, ugi.getShortUserName()); - logWriter.append(logKey, logValue); + // When we try to open FileInputStream for stderr, it will throw out an IOException. + // Skip the log aggregation for stderr. + LogValue spyLogValue = spy(logValue); + File errorFile = new File((new Path(srcFilePath, "stderr")).toString()); + doThrow(new IOException("Mock can not open FileInputStream")).when( + spyLogValue).secureOpenFile(errorFile); + + logWriter.append(logKey, spyLogValue); logWriter.close(); // make sure permission are correct on the file @@ -218,11 +227,15 @@ public void testReadAcontainerLogs1() throws Exception { Writer writer = new StringWriter(); LogReader.readAcontainerLogs(dis, writer); + // We should only do the log aggregation for stdout. + // Since we could not open the fileInputStream for stderr, this file is not + // aggregated. String s = writer.toString(); int expectedLength = "\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length() + "\nLog Contents:\n".length() + numChars; Assert.assertTrue("LogType not matched", s.contains("LogType:stdout")); + Assert.assertTrue("stderr should not be aggregated.", !s.contains("LogType:stderr")); Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars)); Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));