diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 70f1a71..66c0ee0 100644 --- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -112,6 +112,18 @@ + + + + + + + + + + + + @@ -286,6 +298,13 @@ + + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index b669332..2ad1063 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -462,7 +462,7 @@ public void close() { @Public @Evolving - public static class LogReader { + public static class LogReader implements LogFormatReader { private final FSDataInputStream fsDataIStream; private final TFile.Reader.Scanner scanner; @@ -486,6 +486,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile) * @return the application owner. * @throws IOException */ + @Override public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = reader.createScanner(); LogKey key = new LogKey(); @@ -508,6 +509,7 @@ public String getApplicationOwner() throws IOException { * @return a map of the Application ACLs. * @throws IOException */ + @Override public Map getApplicationAcls() throws IOException { // TODO Seek directly to the key once a comparator is specified. @@ -579,6 +581,7 @@ public DataInputStream next(LogKey key) throws IOException { * @throws IOException */ @Private + @Override public ContainerLogsReader getContainerLogsReader( ContainerId containerId) throws IOException { ContainerLogsReader logReader = null; @@ -742,6 +745,7 @@ public static void readAContainerLogsForALogType( readAContainerLogsForALogType(valueStream, out, -1); } + @Override public void close() { IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java new file mode 100644 index 0000000..ed89992 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/CombinedAggregatedLogFormat.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.BoundedInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CombinedAggregatedLogFormat { + + private static final Log LOG = + LogFactory.getLog(CombinedAggregatedLogFormat.class); + private static final int CURRENT_VERSION = 1; + private static final String DELIMITER = ","; + private static final String LINE_DELIMITER = System.lineSeparator(); + + private static StreamCopier streamCopier; + + static { + // Allows us to inject failures while copying streams for testing + if (System.getProperty("TestCombinedAggregatedLogFormat", "false") + .equals("true")) { + streamCopier = new StreamCopierForTest(); + } else { + streamCopier = new StreamCopier(); + } + } + + /** + * Umask for the log file. + */ + private static final FsPermission APP_LOG_FILE_UMASK = FsPermission + .createImmutable((short) (0640 ^ 0777)); + + public CombinedAggregatedLogFormat() { + } + + public static class LogWriter { + + private final FSDataOutputStream fsDataOStream; + private final OutputStreamWriter indexWriter; + private FileContext fc; + private long currentOffset; + private int indexLineCount; + + public LogWriter(final Configuration conf, final Path indexFile, + final Path logFile, final UserGroupInformation userUgi) + throws IOException { + try { + this.fc = + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public FileContext run() throws Exception { + fc = FileContext.getFileContext(conf); + fc.setUMask(APP_LOG_FILE_UMASK); + return fc; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + + try { + currentOffset = fc.getFileStatus(logFile).getLen(); + + // Verify the header is not missing any lines + if (currentOffset > 0l) { + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fc.open(indexFile), "utf-8"); + indexLineCount = 0; + while (indexLineCount < 3 && indexScanner.hasNextLine()) { + indexScanner.nextLine(); + indexLineCount++; + } + } finally { + if (indexScanner != null) { + indexScanner.close(); + } + } + } + } catch (FileNotFoundException fnfe) { + currentOffset = 0l; + indexLineCount = 0; + } + + this.fsDataOStream = fc.create( + logFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}); + + indexWriter = new OutputStreamWriter(fc.create( + indexFile, + EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), + new Options.CreateOpts[]{}), "utf-8"); + } + + private void writeApplicationACLs(Map appAcls) + throws IOException { + indexWriter.write(LINE_DELIMITER + appAcls.size()); + for (Map.Entry entry : appAcls.entrySet()) { + indexWriter.write(DELIMITER + entry.getKey() + + DELIMITER + entry.getValue()); + } + } + + public void append(AggregatedLogFormat.LogReader reader) throws IOException { + if (indexLineCount < 3) { + switch (indexLineCount) { + case 0: + indexWriter.write(Integer.toString(CURRENT_VERSION)); + indexLineCount++; + case 1: + writeApplicationACLs(reader.getApplicationAcls()); + indexLineCount++; + case 2: + indexWriter.write(LINE_DELIMITER + reader.getApplicationOwner()); + indexLineCount++; + } + } + + AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(); + DataInputStream valueStream = reader.next(key); + while (valueStream != null) { + indexWriter.write(LINE_DELIMITER + key); + long length = streamCopier.copy(valueStream, fsDataOStream); + indexWriter.write(DELIMITER + currentOffset + DELIMITER + length); + currentOffset += length; + + // Next container + key = new AggregatedLogFormat.LogKey(); + valueStream = reader.next(key); + } + } + + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(indexWriter); + org.apache.hadoop.io.IOUtils.closeStream(fsDataOStream); + } + } + + public static class LogReader implements LogFormatReader { + + private final FSDataInputStream fsDataIStream; + private Map acls; + private String owner; + private Map index; + + public LogReader(Configuration conf, Path indexFile, Path logFile) throws IOException { + FileContext fileContext = FileContext.getFileContext(conf); + + this.fsDataIStream = fileContext.open(logFile); + + Scanner indexScanner = null; + try { + indexScanner = new Scanner(fileContext.open(indexFile), "utf-8"); + loadIndexFile(indexScanner); + } finally { + org.apache.hadoop.io.IOUtils.closeStream(indexScanner); + } + } + + @Override + public Map getApplicationAcls() { + return acls; + } + + @Override + public String getApplicationOwner() { + return owner; + } + + private void readApplicationACLs(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + Scanner sc = new Scanner(line); + sc.useDelimiter(DELIMITER); + int numAcls = sc.nextInt(); + acls = new HashMap(numAcls); + for (int i = 0; i < numAcls; i++) { + String appAccessOp = sc.next(); + String aclString = sc.next(); + acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); + } + } + + private void loadIndexFile(Scanner indexScanner) throws IOException { + String line = indexScanner.nextLine(); + int version = Integer.parseInt(line); + if (version != CURRENT_VERSION) { + throw new IOException("Incorrect version: expected " + CURRENT_VERSION + + " but found " + version); + } + readApplicationACLs(indexScanner); + owner = indexScanner.nextLine(); + + index = new HashMap(); + while (indexScanner.hasNextLine()) { + Scanner sc = new Scanner(indexScanner.nextLine()); + sc.useDelimiter(DELIMITER); + try { + String containerId = sc.next(); + long offset = sc.nextLong(); + long length = sc.nextLong(); + index.put(containerId, new IndexEntry(offset, length)); + } catch (Exception e) { + // this entry is invalid or incomplete; skip it + LOG.debug("Invalid index entry; skipping", e); + } + } + } + + @VisibleForTesting + public Set getContainerIds() { + return index.keySet(); + } + + /** + * Get a ContainerLogsReader to read the logs for + * the specified container. + * + * @param containerId + * @return object to read the container's logs or null if the + * logs could not be found + * @throws IOException + */ + @InterfaceAudience.Private + @Override + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException { + AggregatedLogFormat.ContainerLogsReader logReader = null; + IndexEntry iEntry = index.get(containerId.toString()); + if (iEntry != null) { + fsDataIStream.seek(iEntry.getOffset()); + BoundedInputStream boundedFsDataIStream = new BoundedInputStream(fsDataIStream, iEntry.getLength()); + DataInputStream valueStream = new DataInputStream(boundedFsDataIStream); + if (valueStream != null) { + logReader = new AggregatedLogFormat.ContainerLogsReader(valueStream); + } + } + return logReader; + } + + @Override + public void close() { + org.apache.hadoop.io.IOUtils.closeStream(fsDataIStream); + } + } + + private static class IndexEntry { + private long offset; + private long length; + + public IndexEntry(long offset, long length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } + + protected static class StreamCopier { + public long copy(InputStream input, OutputStream output) + throws IOException { + return IOUtils.copyLarge(input, output); + } + } + + protected static class StreamCopierForTest extends StreamCopier { + public long copy(InputStream input, OutputStream output) + throws IOException { + String cause = System.getProperty("TestCombinedAggregatedLogFormat_Cause"); + if (cause != null) { + if (cause.equals("IOException")) { + throw new IOException("Injected IO Failure!"); + } else if (cause.equals("Exit")) { + System.exit(1); + } + } + return IOUtils.copyLarge(input, output); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java new file mode 100644 index 0000000..6ac440a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogFormatReader.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.io.IOException; +import java.util.Map; + +public interface LogFormatReader { + + public String getApplicationOwner() throws IOException; + + public Map getApplicationAcls() + throws IOException; + + public AggregatedLogFormat.ContainerLogsReader getContainerLogsReader( + ContainerId containerId) throws IOException; + + public void close(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java new file mode 100644 index 0000000..4c5feeb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestCombinedAggregatedLogFormat.java @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.TestContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +public class TestCombinedAggregatedLogFormat { + + private static final File testWorkDir = new File("target", + "TestCombinedAggregatedLogFormat"); + private static final Configuration conf = new Configuration(); + private static final FileSystem fs; + + static { + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Allows us to inject failures while copying streams + System.setProperty("TestCombinedAggregatedLogFormat", "true"); + } + + @Before + public void setup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + if (fs.exists(workDirPath)) { + fs.delete(workDirPath, true); + } + fs.mkdirs(workDirPath); + } + + @After + public void cleanup() throws Exception { + Path workDirPath = new Path(testWorkDir.getAbsolutePath()); + fs.delete(workDirPath, true); + } + + /** + * This test creates local log files for 2 containers, aggregates them, then + * combines them, and finally reads the combined aggregated log file. It + * then creates local log files for a 3rd container, repeats the process for + * it, and verifies that all 3 containers' log files are in the combined + * aggregated log file. + * + * @throws Exception + */ + @Test + public void testReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + ContainerId containerIdC = TestContainerId.newContainerId(1, 1, 1, 3); + + Path remoteAppLogFileA = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA, + "hostA_1234"); + Path remoteAppLogFileB = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB, + "hostB_1234"); + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + CombinedAggregatedLogFormat.LogWriter cWriter = null; + AggregatedLogFormat.LogReader aReader = null; + try { + cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA); + cWriter.append(aReader); + aReader.close(); + aReader = null; + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + CombinedAggregatedLogFormat.LogReader cReader = null; + try { + cReader = + new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(2, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB}) { + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + + Path remoteAppLogFileC = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdC, + "hostC_1234"); + cWriter = null; + aReader = null; + try { + cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileC); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + cReader = null; + try { + cReader = + new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(3, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdA.toString())); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + Assert.assertTrue(containerIds.contains(containerIdC.toString())); + for (ContainerId containerId : + new ContainerId[]{containerIdA, containerIdB, containerIdC}) { + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerId); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + + /** + * This test creates local log files for 1 container, aggregates them, then + * combines them, then injects an IO failure while copying the data, and + * finally reads the combined aggregated log file (verifying that the + * container is effectively not there). It then creates local log files + * for a 2nd container, repeats the process for it (without the IO failure), + * and verifies that the second container's log files are in the combined + * aggregated log file. + * + * @throws Exception + */ + @Test + public void testIncompleteIndexReaderWriter() throws Exception { + File rootLogDirs = new File(testWorkDir.getAbsolutePath(), "local-logs"); + Path remoteAggregatedLogDirs = new Path(testWorkDir.getAbsolutePath(), + "logs"); + ContainerId containerIdA = TestContainerId.newContainerId(1, 1, 1, 1); + ContainerId containerIdB = TestContainerId.newContainerId(1, 1, 1, 2); + + Path remoteAppLogFileA = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdA, + "hostA_1234"); + + Path indexFile = + new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString() + + ".index"); + Path logFile = new Path(testWorkDir.getAbsolutePath(), + containerIdA.getApplicationAttemptId().getApplicationId().toString()); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + CombinedAggregatedLogFormat.LogWriter cWriter = null; + AggregatedLogFormat.LogReader aReader = null; + try { + cWriter = new CombinedAggregatedLogFormat.LogWriter(conf, indexFile, + logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileA); + // Inject IO Failure + System.setProperty("TestCombinedAggregatedLogFormat_Cause", "IOException"); + try { + cWriter.append(aReader); + } catch (IOException ioe) { + if (!ioe.getMessage() + .equals("Injected IO Failure!")) { + throw ioe; + } + } + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + System.setProperty("TestCombinedAggregatedLogFormat_Cause", "None"); + } + + // Verify that the index entry is incomplete + File iFile = new File(testWorkDir.getAbsolutePath(), indexFile.getName()); + StringBuilder sb = new StringBuilder(); + Scanner sc = null; + try { + sc = new Scanner(iFile); + String line = sc.nextLine(); + Assert.assertEquals("1", line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals("1,VIEW_APP," + ugi.getUserName(), line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals(ugi.getUserName(), line); + sb.append(line).append(System.lineSeparator()); + line = sc.nextLine(); + Assert.assertEquals(containerIdA.toString(), line); + Assert.assertFalse(sc.hasNextLine()); + } finally { + if (sc != null) { + sc.close(); + } + } + + CombinedAggregatedLogFormat.LogReader cReader = null; + try { + cReader = new CombinedAggregatedLogFormat.LogReader(conf, indexFile, + logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(0, containerIds.size()); + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerIdA); + Assert.assertNull(logReader); + } finally { + if (cReader != null) { + cReader.close(); + } + } + + Path remoteAppLogFileB = + writeAggregatedLog(rootLogDirs, remoteAggregatedLogDirs, containerIdB, + "hostB_1234"); + + cWriter = null; + aReader = null; + try { + cWriter = + new CombinedAggregatedLogFormat.LogWriter(conf, indexFile, logFile, ugi); + aReader = new AggregatedLogFormat.LogReader(conf, remoteAppLogFileB); + cWriter.append(aReader); + } finally { + if (cWriter != null) { + cWriter.close(); + } + if (aReader != null) { + aReader.close(); + } + } + + cReader = null; + try { + cReader = + new CombinedAggregatedLogFormat.LogReader(conf, indexFile, logFile); + Assert.assertEquals(ugi.getUserName(), cReader.getApplicationOwner()); + Assert.assertEquals("{VIEW_APP=" + ugi.getUserName() + "}", + cReader.getApplicationAcls().toString()); + Set containerIds = cReader.getContainerIds(); + Assert.assertEquals(1, containerIds.size()); + Assert.assertTrue(containerIds.contains(containerIdB.toString())); + AggregatedLogFormat.ContainerLogsReader logReader = + cReader.getContainerLogsReader(containerIdB); + int i = 0; + while (logReader.nextLog() != null) { + i++; + Assert.assertEquals("log" + i, logReader.getCurrentLogType()); + char[] buf = new char[(int) logReader.getCurrentLogLength()]; + logReader.read(buf, 0, buf.length); + Assert.assertEquals("test log" + i, new String(buf)); + } + Assert.assertEquals(i, 3); + } finally { + if (cReader != null) { + cReader.close(); + } + } + } + + private Path writeAggregatedLog(File rootLogDirs, + Path remoteAggregatedLogDirs, ContainerId containerId, String node) + throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Path remoteAppDir = new Path(remoteAggregatedLogDirs, + containerId.getApplicationAttemptId().getApplicationId().toString()); + if (!fs.exists(remoteAppDir)) { + assertTrue(fs.mkdirs(remoteAppDir)); + } + Path remoteAppLogFile = new Path(remoteAppDir, node); + AggregatedLogFormat.LogWriter writer = null; + try { + writer = new AggregatedLogFormat.LogWriter(conf, remoteAppLogFile, ugi); + AggregatedLogFormat.LogKey logKey = + new AggregatedLogFormat.LogKey(containerId); + writer.writeApplicationOwner(ugi.getUserName()); + Map appAcls = + new HashMap(); + appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); + writer.writeApplicationACLs(appAcls); + File appLogDir = new File(rootLogDirs, + containerId.getApplicationAttemptId().getApplicationId().toString()); + File containerLogDir = new File(appLogDir, containerId.toString()); + writeLogs(containerLogDir.getAbsolutePath()); + AggregatedLogFormat.LogValue logValue = + new AggregatedLogFormat.LogValue( + Arrays.asList(rootLogDirs.getAbsolutePath()), containerId, + ugi.getUserName()); + writer.append(logKey, logValue); + } finally { + if (writer != null) { + writer.close(); + } + } + return remoteAppLogFile; + } + + private void writeLogs(String dirName) throws Exception { + File f = new File(dirName + File.separator + "log1"); + if (!f.getParentFile().exists()) { + assertTrue(f.getParentFile().mkdirs()); + } + + writeLog(dirName + File.separator + "log1", "test log1"); + writeLog(dirName + File.separator + "log2", "test log2"); + writeLog(dirName + File.separator + "log3", "test log3"); + } + + private void writeLog(String fileName, String text) throws Exception { + File f = new File(fileName); + if (f.exists()) { + f.delete(); + } + Writer writer = null; + try { + writer = new FileWriter(f); + writer.write(text); + } finally { + if (writer != null) { + writer.close(); + } + } + } +}