diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 70f1a71..66c0ee0 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -112,6 +112,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
@@ -286,6 +298,13 @@
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index b669332..2ad1063 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -462,7 +462,7 @@ public void close() {
@Public
@Evolving
- public static class LogReader {
+ public static class LogReader implements LogFormatReader {
private final FSDataInputStream fsDataIStream;
private final TFile.Reader.Scanner scanner;
@@ -486,6 +486,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
* @return the application owner.
* @throws IOException
*/
+ @Override
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
@@ -508,6 +509,7 @@ public String getApplicationOwner() throws IOException {
* @return a map of the Application ACLs.
* @throws IOException
*/
+ @Override
public Map getApplicationAcls()
throws IOException {
// TODO Seek directly to the key once a comparator is specified.
@@ -579,6 +581,7 @@ public DataInputStream next(LogKey key) throws IOException {
* @throws IOException
*/
@Private
+ @Override
public ContainerLogsReader getContainerLogsReader(
ContainerId containerId) throws IOException {
ContainerLogsReader logReader = null;
@@ -742,6 +745,7 @@ public static void readAContainerLogsForALogType(
readAContainerLogsForALogType(valueStream, out, -1);
}
+ @Override
public void close() {
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java
new file mode 100644
index 0000000..ed89992
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CombinedAggregatedLogFormat {
+
+ private static final Log LOG =
+ LogFactory.getLog(CombinedAggregatedLogFormat.class);
+ private static final int CURRENT_VERSION = 1;
+ private static final String DELIMITER = ",";
+ private static final String LINE_DELIMITER = System.lineSeparator();
+
+ private static StreamCopier streamCopier;
+
+ static {
+ // Allows us to inject failures while copying streams for testing
+ if (System.getProperty("TestCombinedAggregatedLogFormat", "false")
+ .equals("true")) {
+ streamCopier = new StreamCopierForTest();
+ } else {
+ streamCopier = new StreamCopier();
+ }
+ }
+
+ /**
+ * Umask for the log file.
+ */
+ private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
+ .createImmutable((short) (0640 ^ 0777));
+
+ public CombinedAggregatedLogFormat() {
+ }
+
+ public static class LogWriter {
+
+ private final FSDataOutputStream fsDataOStream;
+ private final OutputStreamWriter indexWriter;
+ private FileContext fc;
+ private long currentOffset;
+ private int indexLineCount;
+
+ public LogWriter(final Configuration conf, final Path indexFile,
+ final Path logFile, final UserGroupInformation userUgi)
+ throws IOException {
+ try {
+ this.fc =
+ userUgi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public FileContext run() throws Exception {
+ fc = FileContext.getFileContext(conf);
+ fc.setUMask(APP_LOG_FILE_UMASK);
+ return fc;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ currentOffset = fc.getFileStatus(logFile).getLen();
+
+ // Verify the header is not missing any lines
+ if (currentOffset > 0l) {
+ Scanner indexScanner = null;
+ try {
+ indexScanner = new Scanner(fc.open(indexFile), "utf-8");
+ indexLineCount = 0;
+ while (indexLineCount < 3 && indexScanner.hasNextLine()) {
+ indexScanner.nextLine();
+ indexLineCount++;
+ }
+ } finally {
+ if (indexScanner != null) {
+ indexScanner.close();
+ }
+ }
+ }
+ } catch (FileNotFoundException fnfe) {
+ currentOffset = 0l;
+ indexLineCount = 0;
+ }
+
+ this.fsDataOStream = fc.create(
+ logFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
+ new Options.CreateOpts[]{});
+
+ indexWriter = new OutputStreamWriter(fc.create(
+ indexFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
+ new Options.CreateOpts[]{}), "utf-8");
+ }
+
+ private void writeApplicationACLs(Map appAcls)
+ throws IOException {
+ indexWriter.write(LINE_DELIMITER + appAcls.size());
+ for (Map.Entry entry : appAcls.entrySet()) {
+ indexWriter.write(DELIMITER + entry.getKey()
+ + DELIMITER + entry.getValue());
+ }
+ }
+
+ public void append(AggregatedLogFormat.LogReader reader) throws IOException {
+ if (indexLineCount < 3) {
+ switch (indexLineCount) {
+ case 0:
+ indexWriter.write(Integer.toString(CURRENT_VERSION));
+ indexLineCount++;
+ case 1:
+ writeApplicationACLs(reader.getApplicationAcls());
+ indexLineCount++;
+ case 2:
+ indexWriter.write(LINE_DELIMITER + reader.getApplicationOwner());
+ indexLineCount++;
+ }
+ }
+
+ AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
+ DataInputStream valueStream = reader.next(key);
+ while (valueStream != null) {
+ indexWriter.write(LINE_DELIMITER + key);
+ long length = streamCopier.copy(valueStream, fsDataOStream);
+ indexWriter.write(DELIMITER + currentOffset + DELIMITER + length);
+ currentOffset += length;
+
+ // Next container
+ key = new AggregatedLogFormat.LogKey();
+ valueStream = reader.next(key);
+ }
+ }
+
+ public void close() {
+ org.apache.hadoop.io.IOUtils.closeStream(indexWriter);
+ org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream);
+ }
+ }
+
+ public static class LogReader implements LogFormatReader {
+
+ private final FSDataInputStream fsDataIStream;
+ private Map acls;
+ private String owner;
+ private Map index;
+
+ public LogReader(Configuration conf, Path indexFile, Path logFile) throws IOException {
+ FileContext fileContext = FileContext.getFileContext(conf);
+
+ this.fsDataIStream = fileContext.open(logFile);
+
+ Scanner indexScanner = null;
+ try {
+ indexScanner = new Scanner(fileContext.open(indexFile), "utf-8");
+ loadIndexFile(indexScanner);
+ } finally {
+ org.apache.hadoop.io.IOUtils.closeStream(indexScanner);
+ }
+ }
+
+ @Override
+ public Map getApplicationAcls() {
+ return acls;
+ }
+
+ @Override
+ public String getApplicationOwner() {
+ return owner;
+ }
+
+ private void readApplicationACLs(Scanner indexScanner) throws IOException {
+ String line = indexScanner.nextLine();
+ Scanner sc = new Scanner(line);
+ sc.useDelimiter(DELIMITER);
+ int numAcls = sc.nextInt();
+ acls = new HashMap(numAcls);
+ for (int i = 0; i < numAcls; i++) {
+ String appAccessOp = sc.next();
+ String aclString = sc.next();
+ acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
+ }
+ }
+
+ private void loadIndexFile(Scanner indexScanner) throws IOException {
+ String line = indexScanner.nextLine();
+ int version = Integer.parseInt(line);
+ if (version != CURRENT_VERSION) {
+ throw new IOException("Incorrect version: expected " + CURRENT_VERSION
+ + " but found " + version);
+ }
+ readApplicationACLs(indexScanner);
+ owner = indexScanner.nextLine();
+
+ index = new HashMap();
+ while (indexScanner.hasNextLine()) {
+ Scanner sc = new Scanner(indexScanner.nextLine());
+ sc.useDelimiter(DELIMITER);
+ try {
+ String containerId = sc.next();
+ long offset = sc.nextLong();
+ long length = sc.nextLong();
+ index.put(containerId, new IndexEntry(offset, length));
+ } catch (Exception e) {
+ // this entry is invalid or incomplete; skip it
+ LOG.debug("Invalid index entry; skipping", e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public Set getContainerIds() {
+ return index.keySet();
+ }
+
+ /**
+ * Get a ContainerLogsReader to read the logs for
+ * the specified container.
+ *
+ * @param containerId
+ * @return object to read the container's logs or null if the
+ * logs could not be found
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ @Override
+ public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader(
+ ContainerId containerId) throws IOException {
+ AggregatedLogFormat.ContainerLogsReader logReader = null;
+ IndexEntry iEntry = index.get(containerId.toString());
+ if (iEntry != null) {
+ fsDataIStream.seek(iEntry.getOffset());
+ BoundedInputStream boundedFsDataIStream = new BoundedInputStream(fsDataIStream, iEntry.getLength());
+ DataInputStream valueStream = new DataInputStream(boundedFsDataIStream);
+ if (valueStream != null) {
+ logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream);
+ }
+ }
+ return logReader;
+ }
+
+ @Override
+ public void close() {
+ org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream);
+ }
+ }
+
+ private static class IndexEntry {
+ private long offset;
+ private long length;
+
+ public IndexEntry(long offset, long length) {
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+ }
+
+ protected static class StreamCopier {
+ public long copy(InputStream input, OutputStream output)
+ throws IOException {
+ return IOUtils.copyLarge(input, output);
+ }
+ }
+
+ protected static class StreamCopierForTest extends StreamCopier {
+ public long copy(InputStream input, OutputStream output)
+ throws IOException {
+ String cause = System.getProperty("TestCombinedAggregatedLogFormat_Cause");
+ if (cause != null) {
+ if (cause.equals("IOException")) {
+ throw new IOException("Injected IO Failure!");
+ } else if (cause.equals("Exit")) {
+ System.exit(1);
+ }
+ }
+ return IOUtils.copyLarge(input, output);
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java
new file mode 100644
index 0000000..6ac440a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface LogFormatReader {
+
+ public String getApplicationOwner() throws IOException;
+
+ public Map getApplicationAcls()
+ throws IOException;
+
+ public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader(
+ ContainerId containerId) throws IOException;
+
+ public void close();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java
new file mode 100644
index 0000000..4c5feeb
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.TestContainerId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestCombinedAggregatedLogFormat {
+
+ private static final File testWorkDir = new File("target",
+ "TestCombinedAggregatedLogFormat");
+ private static final Configuration conf = new Configuration();
+ private static final FileSystem fs;
+
+ static {
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // Allows us to inject failures while copying streams
+ System.setProperty("TestCombinedAggregatedLogFormat", "true");
+ }
+
+ @Before
+ public void setup() throws Exception {
+ Path workDirPath = new Path(testWorkDir.getAbsolutePath());
+ if (fs.exists(workDirPath)) {
+ fs.delete(workDirPath, true);
+ }
+ fs.mkdirs(workDirPath);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ Path workDirPath = new Path(testWorkDir.getAbsolutePath());
+ fs.delete(workDirPath, true);
+ }
+
+ /**
+ * This test creates local log files for 2 containers, aggregates them, then
+ * combines them, and finally reads the combined aggregated log file. It
+ * then creates local log files for a 3rd container, repeats the process for
+ * it, and verifies that all 3 containers' log files are in the combined
+ * aggregated log file.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReaderWriter() throws Exception {
+ File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs");
+ Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(),
+ "logs");
+ ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1);
+ ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2);
+ ContainerId containerIdC = TestContainerId.newContainerId(1, 1, 1, 3);
+
+ Path remoteAppLogFileA =
+ writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA,
+ "hostA_1234");
+ Path remoteAppLogFileB =
+ writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB,
+ "hostB_1234");
+
+ Path indexFile =
+ new Path(testWorkDir.getAbsolutePath(),
+ containerIdA.getApplicationAttemptId().getApplicationId().toString()
+ + ".index");
+ Path logFile = new Path(testWorkDir.getAbsolutePath(),
+ containerIdA.getApplicationAttemptId().getApplicationId().toString());
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ CombinedAggregatedLogFormat.LogWriter cWriter = null;
+ AggregatedLogFormat.LogReader aReader = null;
+ try {
+ cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile,
+ logFile, ugi);
+ aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA);
+ cWriter.append(aReader);
+ aReader.close();
+ aReader = null;
+ aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB);
+ cWriter.append(aReader);
+ } finally {
+ if (cWriter != null) {
+ cWriter.close();
+ }
+ if (aReader != null) {
+ aReader.close();
+ }
+ }
+
+ CombinedAggregatedLogFormat.LogReader cReader = null;
+ try {
+ cReader =
+ new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile);
+ Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner());
+ Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}",
+ cReader.getApplicationAcls().toString());
+ Set containerIds = cReader.getContainerIds();
+ Assert.assertEquals(2, containerIds.size());
+ Assert.assertTrue(containerIds.contains(containerIdA.toString()));
+ Assert.assertTrue(containerIds.contains(containerIdB.toString()));
+ for (ContainerId containerId :
+ new ContainerId[]{containerIdA, containerIdB}) {
+ AggregatedLogFormat.ContainerLogsReader logReader =
+ cReader.getContainerLogsReader(containerId);
+ int i = 0;
+ while (logReader.nextLog() != null) {
+ i++;
+ Assert.assertEquals("log" + i, logReader.getCurrentLogType());
+ char[] buf = new char[(int) logReader.getCurrentLogLength()];
+ logReader.read(buf, 0, buf.length);
+ Assert.assertEquals("test log" + i, new String(buf));
+ }
+ Assert.assertEquals(i, 3);
+ }
+ } finally {
+ if (cReader != null) {
+ cReader.close();
+ }
+ }
+
+ Path remoteAppLogFileC =
+ writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdC,
+ "hostC_1234");
+ cWriter = null;
+ aReader = null;
+ try {
+ cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile,
+ logFile, ugi);
+ aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileC);
+ cWriter.append(aReader);
+ } finally {
+ if (cWriter != null) {
+ cWriter.close();
+ }
+ if (aReader != null) {
+ aReader.close();
+ }
+ }
+
+ cReader = null;
+ try {
+ cReader =
+ new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile);
+ Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner());
+ Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}",
+ cReader.getApplicationAcls().toString());
+ Set containerIds = cReader.getContainerIds();
+ Assert.assertEquals(3, containerIds.size());
+ Assert.assertTrue(containerIds.contains(containerIdA.toString()));
+ Assert.assertTrue(containerIds.contains(containerIdB.toString()));
+ Assert.assertTrue(containerIds.contains(containerIdC.toString()));
+ for (ContainerId containerId :
+ new ContainerId[]{containerIdA, containerIdB, containerIdC}) {
+ AggregatedLogFormat.ContainerLogsReader logReader =
+ cReader.getContainerLogsReader(containerId);
+ int i = 0;
+ while (logReader.nextLog() != null) {
+ i++;
+ Assert.assertEquals("log" + i, logReader.getCurrentLogType());
+ char[] buf = new char[(int) logReader.getCurrentLogLength()];
+ logReader.read(buf, 0, buf.length);
+ Assert.assertEquals("test log" + i, new String(buf));
+ }
+ Assert.assertEquals(i, 3);
+ }
+ } finally {
+ if (cReader != null) {
+ cReader.close();
+ }
+ }
+ }
+
+ /**
+ * This test creates local log files for 1 container, aggregates them, then
+ * combines them, then injects an IO failure while copying the data, and
+ * finally reads the combined aggregated log file (verifying that the
+ * container is effectively not there). It then creates local log files
+ * for a 2nd container, repeats the process for it (without the IO failure),
+ * and verifies that the second container's log files are in the combined
+ * aggregated log file.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIncompleteIndexReaderWriter() throws Exception {
+ File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs");
+ Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(),
+ "logs");
+ ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1);
+ ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2);
+
+ Path remoteAppLogFileA =
+ writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA,
+ "hostA_1234");
+
+ Path indexFile =
+ new Path(testWorkDir.getAbsolutePath(),
+ containerIdA.getApplicationAttemptId().getApplicationId().toString()
+ + ".index");
+ Path logFile = new Path(testWorkDir.getAbsolutePath(),
+ containerIdA.getApplicationAttemptId().getApplicationId().toString());
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ CombinedAggregatedLogFormat.LogWriter cWriter = null;
+ AggregatedLogFormat.LogReader aReader = null;
+ try {
+ cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile,
+ logFile, ugi);
+ aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA);
+ // Inject IO Failure
+ System.setProperty("TestCombinedAggregatedLogFormat_Cause", "IOException");
+ try {
+ cWriter.append(aReader);
+ } catch (IOException ioe) {
+ if (!ioe.getMessage()
+ .equals("Injected IO Failure!")) {
+ throw ioe;
+ }
+ }
+ } finally {
+ if (cWriter != null) {
+ cWriter.close();
+ }
+ if (aReader != null) {
+ aReader.close();
+ }
+ System.setProperty("TestCombinedAggregatedLogFormat_Cause", "None");
+ }
+
+ // Verify that the index entry is incomplete
+ File iFile = new File(testWorkDir.getAbsolutePath(), indexFile.getName());
+ StringBuilder sb = new StringBuilder();
+ Scanner sc = null;
+ try {
+ sc = new Scanner(iFile);
+ String line = sc.nextLine();
+ Assert.assertEquals("1", line);
+ sb.append(line).append(System.lineSeparator());
+ line = sc.nextLine();
+ Assert.assertEquals("1,VIEW_APP," + ugi.getUserName(), line);
+ sb.append(line).append(System.lineSeparator());
+ line = sc.nextLine();
+ Assert.assertEquals(ugi.getUserName(), line);
+ sb.append(line).append(System.lineSeparator());
+ line = sc.nextLine();
+ Assert.assertEquals(containerIdA.toString(), line);
+ Assert.assertFalse(sc.hasNextLine());
+ } finally {
+ if (sc != null) {
+ sc.close();
+ }
+ }
+
+ CombinedAggregatedLogFormat.LogReader cReader = null;
+ try {
+ cReader = new CombinedAggregatedLogFormat.LogReader(conf, indexFile,
+ logFile);
+ Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner());
+ Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}",
+ cReader.getApplicationAcls().toString());
+ Set containerIds = cReader.getContainerIds();
+ Assert.assertEquals(0, containerIds.size());
+ AggregatedLogFormat.ContainerLogsReader logReader =
+ cReader.getContainerLogsReader(containerIdA);
+ Assert.assertNull(logReader);
+ } finally {
+ if (cReader != null) {
+ cReader.close();
+ }
+ }
+
+ Path remoteAppLogFileB =
+ writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB,
+ "hostB_1234");
+
+ cWriter = null;
+ aReader = null;
+ try {
+ cWriter =
+ new CombinedAggregatedLogFormat.LogWriter(conf, indexFile, logFile, ugi);
+ aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB);
+ cWriter.append(aReader);
+ } finally {
+ if (cWriter != null) {
+ cWriter.close();
+ }
+ if (aReader != null) {
+ aReader.close();
+ }
+ }
+
+ cReader = null;
+ try {
+ cReader =
+ new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile);
+ Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner());
+ Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}",
+ cReader.getApplicationAcls().toString());
+ Set containerIds = cReader.getContainerIds();
+ Assert.assertEquals(1, containerIds.size());
+ Assert.assertTrue(containerIds.contains(containerIdB.toString()));
+ AggregatedLogFormat.ContainerLogsReader logReader =
+ cReader.getContainerLogsReader(containerIdB);
+ int i = 0;
+ while (logReader.nextLog() != null) {
+ i++;
+ Assert.assertEquals("log" + i, logReader.getCurrentLogType());
+ char[] buf = new char[(int) logReader.getCurrentLogLength()];
+ logReader.read(buf, 0, buf.length);
+ Assert.assertEquals("test log" + i, new String(buf));
+ }
+ Assert.assertEquals(i, 3);
+ } finally {
+ if (cReader != null) {
+ cReader.close();
+ }
+ }
+ }
+
+ private Path writeAggregatedLog(File rootLogDirs,
+ Path remoteAggregatedLogDirs, ContainerId containerId, String node)
+ throws Exception {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ Path remoteAppDir = new Path(remoteAggregatedLogDirs,
+ containerId.getApplicationAttemptId().getApplicationId().toString());
+ if (!fs.exists(remoteAppDir)) {
+ assertTrue(fs.mkdirs(remoteAppDir));
+ }
+ Path remoteAppLogFile = new Path(remoteAppDir, node);
+ AggregatedLogFormat.LogWriter writer = null;
+ try {
+ writer = new AggregatedLogFormat.LogWriter(conf, remoteAppLogFile, ugi);
+ AggregatedLogFormat.LogKey logKey =
+ new AggregatedLogFormat.LogKey(containerId);
+ writer.writeApplicationOwner(ugi.getUserName());
+ Map appAcls =
+ new HashMap();
+ appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+ writer.writeApplicationACLs(appAcls);
+ File appLogDir = new File(rootLogDirs,
+ containerId.getApplicationAttemptId().getApplicationId().toString());
+ File containerLogDir = new File(appLogDir, containerId.toString());
+ writeLogs(containerLogDir.getAbsolutePath());
+ AggregatedLogFormat.LogValue logValue =
+ new AggregatedLogFormat.LogValue(
+ Arrays.asList(rootLogDirs.getAbsolutePath()), containerId,
+ ugi.getUserName());
+ writer.append(logKey, logValue);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ return remoteAppLogFile;
+ }
+
+ private void writeLogs(String dirName) throws Exception {
+ File f = new File(dirName + File.separator + "log1");
+ if (!f.getParentFile().exists()) {
+ assertTrue(f.getParentFile().mkdirs());
+ }
+
+ writeLog(dirName + File.separator + "log1", "test log1");
+ writeLog(dirName + File.separator + "log2", "test log2");
+ writeLog(dirName + File.separator + "log3", "test log3");
+ }
+
+ private void writeLog(String fileName, String text) throws Exception {
+ File f = new File(fileName);
+ if (f.exists()) {
+ f.delete();
+ }
+ Writer writer = null;
+ try {
+ writer = new FileWriter(f);
+ writer.write(text);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+}